clickhouse支持与多种存储引擎集成,可以从集成的引擎里面读取消息,然后写到真正的数据存储表里。
clickhouse批量写入的性能比较好,我们的业务场景下会大批量的产生数据,如果使用clickhouse-jdbc去写的,写入时机和每批次写入的数量不好把控,最终选择了先将消息写入kafka,然后由clickhouse从kafka消费数据,clickhouse server消费到数据之后写入真正的数据表。
clickhouse集成kafka引擎见官方文档:
https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka/
下面的介绍会与官方文档有重复,然后补充一些集成过程中遇到的坑。
下面介绍clickhouse与kafka集成的步骤,clickhouse版本是22.1.3.7
集成kafka
参数解释
必要参数
- kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
- kafka_topic_list – topic 列表 (my_topic)。
- kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
- kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。
可选参数
kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象
schema.capnp:Message
的名字。kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
关于必选参数中的kafka_format参数,参见Formats部分,format具体解释如下
https://clickhouse.com/docs/zh/interfaces/formats/。
JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
这几种格式,ClickHouse会将行输出为用换行符分隔的JSON值,这些输出数据作为一个整体时,由于没有分隔符(,)因而不是有效的JSON文档。
官方文档给了一些示例。
{"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow
[42,"hello",[1,"a"]] // JSONCompactEachRow
["42","hello","(2,'a')"] // JSONCompactStringsEachRow
由于我的真实的数据表,有一个字段是json类型的字符串,但是一开始设置kafka_format的类型为JSONEachRow时,从kafka消费数据会报错,所以kafka_format格式设置成了JSONAsString,具体的错误后面贴出来。
创建引擎表
创建kafka引擎表,用于从kafka消费数据
CREATE TABLE msg_json_source (
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test',
kafka_group_name = 'topic_test_consumer',
kafka_format = 'JSONAsString'
由于我的数据结构里有嵌套json,如果使用JSONEachRow,有个字段是json类型的字符串,带转义字符,导致clickhouse解析失败,没找到解决办法,所以使用了JSONAsString格式。
创建真实数据表
CREATE TABLE msg_target
(
biz Nullable(String),
sender_id String,
msg_id UInt64,
status String,
status_time UInt64,
content String,
event_time DateTime Default now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR
一个简单的MergeTree引擎的表,其中content是json格式的字符串。
创建物化视图
CREATE MATERIALIZED VIEW msg_json_source_consumer TO msg_target
AS SELECT
JSONExtractString(msg,'biz') as biz,
JSONExtractString(msg,'sender_id') as sender_id,
JSONExtractUInt(msg,'msg_id') as msg_id,
JSONExtractString(msg,'status') as status,
JSONExtractUInt(msg,'status_time') as status_time,
JSONExtractString(msg,'content') as content
FROM msg_json_source
创建的物化视图用于把从kafka消费到的数据,写到真实的数据表里,在这个例子里,msg_json_source从kafka消费到数据,然后通过物化视图msg_json_source_consumer将消费到的数据写到真实的数据表msg_target中。
由于从kafka消费到的数据就是一个json字符串,在这里使用JSONExtractString等json字段提取工具,提取msg里的字段,比如biz,sender_id,content等字段。
status_time原本计划用DatTime64类型的,但是这个时间格式有坑,最终选择了使用UInt64存毫秒级时间戳,具体的问题下面再介绍。
往kafka写消息
在clickhouse创建好3张表之后(kafka引擎表,真实数据表,物化视图表),往kafka发消息
本地安装一个简易的kafka服务端,然后创建topic
/opt/dev/confluent-7.0.1/bin/
./kafka-topics --create --topic topic_test --bootstrap-server localhost:9092
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic topic_test.
创建好topic之后,使用Java客户端往kafka发消息,使用confluent client发也可以。
添加kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
实体类,使用fastjson的@JSONField注解,实体类转字符串的时候,将驼峰转换为下划线
import com.alibaba.fastjson.annotation.JSONField;
public class MsgJsonSource {
private String biz;
@JSONField(name = "sender_id")
private String senderId;
@JSONField(name = "msg_id")
private Long msgId;
private String status;
@JSONField(name = "status_time")
private Long statusTime;
private Map<String,Object> content;
//省略getter/setter
}
测试类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fc.model.MsgJsonSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
public class KafkaSendTest {
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
String topic = "topic_test";
for (int i = 0; i < 10; i++) {
msgJsonSource.setContent(content);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send(new ProducerRecord<>(topic, JSON.toJSONString(msgJsonSource)));
System.out.println("当前消息序号: " + i + ", 发送结果: " + JSON.toJSONString(recordMetadataFuture));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
最终发送完,我们查看一下clickhouse里的数据表的数据,可以发现我们发送到kakfa里的数据,已经成功的消费,并且写入到真实的数据表里了。
遇到的问题
版本问题
当时测试环境部署的版本是21.9,但是这个版本有问题,不推荐安装,建议直接部署22以上的clickhouse
JSONEachRow
我一开始就是使用的JSONEachRow格式,但是我的消息体里还有嵌套的json,类似下面这种格式,里面有个字段还是个json,转行成字符串带转义字符。
然后消息体的string字符串贴一条在这里
{"biz":"biz","content":"{\"current\":1648817159914,\"abc\":\"111\",\"text\":\"gggg\",\"req\":{\"nested\":0}}","msg_id":7561248342312669573,"sender_id":"test_0","status":"status_0","status_time":1648817159914}
然后clickhouse解析消息体报错,当时的错找不到了,现在复现不出来了,非常的难顶。。。。
后来因为赶版本的原因把kafka_format换成了JSONAsString。
时间格式问题
clickhouse是支持DateTime64格式的,可以到毫秒级,但是实际使用过程中却有些坑在,
首先是有的客户端解析毫秒字符串有问题,其次是使用JSONExtract*的方法,会有差异,再然后是jdbc查询的时候,也会导致时间查询有问题。
拿毫秒时间戳和秒级时间戳做试验,clickhouse-server版本是22.3.1.1
把上面的kafka引擎表拿出来改一下
#kafka引擎
CREATE TABLE msg_json_source_004 (
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test_004',
kafka_group_name = 'topic_test_004_consumer',
kafka_format = 'JSONAsString'
#真实数据表
CREATE TABLE msg_target_004
(
biz Nullable(String),
sender_id String,
msg_id UInt64,
status String,
status_time DateTime64(3, 'Asia/Shanghai'),
content String,
event_time DateTime Default now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR
#物化视图
CREATE MATERIALIZED VIEW msg_json_source_004_consumer TO msg_target_004
AS SELECT
JSONExtractString(msg,'biz') as biz,
JSONExtractString(msg,'sender_id') as sender_id,
JSONExtractUInt(msg,'msg_id') as msg_id,
JSONExtractString(msg,'status') as status,
JSONExtractUInt(msg,'status_time') as status_time,
JSONExtractString(msg,'content') as content
FROM msg_json_source_004
其中status_time这个字段的类型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取时间,看下效果
首先发条数据,数据内容如下
{"biz":"biz","content":"1111","msg_id":7253917707485514799,"sender_id":"test_0","status":"status_0","status_time":1648901022355}
传入的是毫秒级时间戳,然后数据表存储的时候就变成了2282年
然后如果传入秒级的时间戳,真实的数据是这样
{"biz":"biz_3","content":"00003","msg_id":3053111163555706035,"sender_id":"test_3","status":"status_3","status_time":1648901508}
clickhouse存储的时候看着时间正常了,但是毫秒丢失了
然后修改一下物化视图的字段提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,现在改成使用 JSONExtractString(msg,'status_time') as status_time提取时间
会发现时间类型又正常了。
这一条数据内容如下
{"biz":"biz_1","content":"2222","msg_id":9151431369051819265,"sender_id":"test_1","status":"status_1","status_time":1648901194855}
最终使用JSONExtractString提取毫秒时间戳,得到了正确的DateTime64的时间,非常的神奇
最终我决定来了个釜底抽薪的方法,时间直接用UInt64存,因为我发送出去的数据是毫秒级时间戳,最终存时间戳,查询时间范围的时候直接用long类型的数据between好了。
这也是无奈之举,万一哪天server更新版本,导致时间出现问题,那就完蛋了,希望后面时间可以稳定一点吧。