集群配置
vim flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
state.savepoints.dir: hdfs:///flink/flink-savepoints
state.checkpoints.num-retained: 10
execution.checkpointing.interval: 10s
classloader.check-leaked-classloader: false
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
SQL语法
-- 时间加减
from_unixtime(unix_timestamp(日期)+1500,'yyyy-MM-dd HH:mm:ss')
-- connector:
---- upsert-kafka
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);