Kafka 是一个分布式流媒体平台,常被用做MQ
kafka组件:
- Producers 消息生产者
- Consumers 消息消费者
-
Broker 消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
kafka概念:
- Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列(即Partition内有序)
- Partition Leader:
Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster. - Replicate:副本,一个Partition的副本Partition,用于高可用。
- Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
- offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.
kafka Partition和副本Partition
每个Partition会有多个副本Partition,分布在不同的Broker上。
Partition存储
topic和partition都是逻辑上的概念
每个partition对应一个目录:{topic}-{partition}
比如图中topic car_data,partition 0的目录时 /car_data-0
/car_data-0中存的时segment(逻辑上的),对应实际的是.index文件和.log文件
index文件通过mmap在内存中
index和log文件
index存储的是稀疏索引(稀疏索引占用空间少)
index和log文件名称和文件的内容中第一个offset有关。
kafka消费者组
kafka消费者Offset保存
新版本中offset由broker维护,offset信息由一个特殊的topic “ __consumer_offsets”来保存,offset以消息形式发送到该topic并保存在broker中。这样consumer提交offset时,只需连接到broker,不用访问zk,避免了zk节点更新瓶颈。
broker消息保存目录在配置文件server.properties中
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
#ls /usr/local/var/lib/kafka-log
__consumer_offsets-0 __consumer_offsets-22 __consumer_offsets-36 __consumer_offsets-5
__consumer_offsets-1 __consumer_offsets-23 __consumer_offsets-37
其中numPartitions由offsets.topic.num.partitions参数决定,默认值即50
key是group.id+topic+分区号,而 value 就是 offset 的值
kafka消息投递语义
Kafka 性能
- 顺序读写
磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机 I/O,最喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O - Memory Mapped Files
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以 Kafka 的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率。
Memory Mapped Files(后面简称 mmap)也被翻译成 内存映射文件 ,在 64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。
完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过 mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销(调用文件的 read 会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)
但也有一个很明显的缺陷——不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。
Kafka 提供了一个参数——producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫 同步 (sync);写入 mmap 之后立即返回 Producer 不调用 flush 叫异步 (async)。 - 消息压缩
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
-
分批发送
生产者,消费者 sample
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record)
private static final String TOPIC_NAME = "car";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (true);
} catch(Exception e) {
// exception
} finally {
consumer.close();
}
}