1.以json形式传递消息
不包含嵌套json格式,这位作者有解决clickhouse与kafka集成
1.1.节点1操作
--建立kafka连接
CREATE TABLE kafka_to_ck_json
(
name String,
age Int16,
createtime DateTime64(3, 'Asia/Shanghai')
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'node95:9092',
kafka_topic_list = 'kafka_to_ck_json',
kafka_group_name = 'consumer_json',
kafka_format = 'JSONEachRow';
--本地表(这里我使用 on cluster 去建表,会被锁住,还没找到解决方案)
CREATE TABLE kafka_sink_ck_json
(
name String,
age Int16,
createtime DateTime64(3, 'Asia/Shanghai')
)
ENGINE = MergeTree()
partition by toYYYYMMDD(createtime)
order by name;
--分布式表
create table kafka_sink_ck_all_json as kafka_sink_ck_json engine = Distributed
(
ck_cluster,
default,
kafka_sink_ck_json,
rand()
);
--物化视图(相当于把kafka的数据写入ck中)
CREATE MATERIALIZED VIEW kafka_sink_ck_view_json TO kafka_sink_ck_all_json
AS
SELECT *
FROM kafka_to_ck_json;
1.2.其他2个节点操作
--本地表(这里我使用 on cluster 去建表,会被锁住,还没找到解决方案)
CREATE TABLE kafka_sink_ck_json
(
name String,
age Int16,
createtime DateTime64(3, 'Asia/Shanghai')
)
ENGINE = MergeTree()
partition by toYYYYMMDD(createtime)
order by name;
--分布式表
create table kafka_sink_ck_all_json as kafka_sink_ck_json engine = Distributed
(
ck_cluster,
default,
kafka_sink_ck_json,
rand()
);
1.3.kafka操作
- 创建topic
sh kafka-topics.sh --create --topic kafka_to_ck_json --bootstrap-server localhost:9092
- 发送消息
sh kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_to_ck_json
- json数据
{"name":"张三","age":16,"createtime":1651121660000}
{"name":"李四","age":18,"createtime":1651111660000}
{"name":"王老五","age":18,"createtime":1651211660020}
1.4.三个节点去验证
--分布式表
select * from kafka_sink_ck_all_json;
--本地表
select * from kafka_sink_ck_json;
--物化视图(节点1中)
select * from kafka_sink_ck_view_json;
2.以csv形式传递消息
2.1.更改格式为kafka_format = 'CSV';
2.2.消息形式以制表符分隔王老五,18,1651211660020
相关资料
clickhouse与kafka集成
Kafka引擎表消费CSV/JSON/AVRO类型数据
输入/输出格式 | ClickHouse Docs