KStream和KTable是Kafka Streams里内建的两个最重要的抽象,分别对应数据流和数据库。Kafka Streams作为流处理技术的一大卖点,即是很好地将存储状态的表(table)和作为记录的流(stream)无缝地结合在了一起。
KStream
数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件(stream of events),这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
KStream的构建方法:
builder.stream()
KTable
传统数据库,包含了各种存储了大量状态(state)的表格。
KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(upsert)。
KTable的构建方法:
builder.table()
KStream和KTable之间的关系
事务日志记录了所有对数据库的更改。数据库保存了日志中最新的记录。数据库就是日志子集的一个缓存,记录了最新数据的子集。
KStream可以看作是KTable的更新日志(changlog),数据流中的每一个记录对应数据库中的每一次更新。
KTable 则可以看作KStream在某一时间点,每一个key对应的最新value的快照(snapshot)。
KStream和KTable之间的相互转换
将KTable转换成KStream
toStream() 方法
KStream<byte[], String> stream = table.toStream();
将KStream转换成KTable
方法1: groupByKey() + aggregation操作
KTable<String, Long> table = stream.groupByKey()
.count();
方法2: 将KStream写回Kafka,再按KTable的格式读出
stream.to("topic0");
KTable<String, String> table = builder.table("topic0");
KStream和KTable不同的使用场景
将topic中数据经过aggregation操作后 ,用KTable来存储结果。
- KStream - 每个新数据都包含了部分信息。
- KTable - 每次更新都合并到原记录上。
KTable与日志压缩(log compaction)
日志压缩可以作为性能提升的一种方式。
删除旧的key value 因为不需要了,只保留每个key的最后一次更新。
带来的优势是:可以快速得到最终状态 而不是每次更新 --- 崩溃后也只需恢复少量数据。
只应对KTable使用,不该对KStream使用。KStream中的每条数据都包含了一部分信息,删除会将这部分信息丢失。
需要手动在创建时对某个topic开启日志压缩: --config cleanup.policy=compact
删除不是立刻进行的,需要等待一个delete.retention.ms
周期(默认为24小时)。
是一个单独的后台压缩线程,需要一定的内存开销。