什么是kafka
- Apache Kafka是一个分布式的流处理平台,由Scala写成,并由Apache软件基金会开发的一个开源消息系统项目,该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
- Kafka是一个分布式消息中间件。
kafka的整体架构和主要概念
Producer, 向队列中发送消息。Produce负责选择将一条记录添加到topic中的某个partition中。
Consumer,从队列中消费消息。每个Consumer都要有一个Consumer group。
Broker,每一个kafka实例就是一个broker,一个broker可以有多个topic。
-
topic,用于分类消息。topic其本质是一个目录,可将同一主题消息归类到同一个目录。生产者向topic中写入数据时的详情如下:
partition:一个topic可以有1到n个分区,可以在创建topic时,指定topic的分区数,单机版的kafka的topic默认只有一个分区。每个分区中的消息的顺序是有序、且不可改变的。
-
offset:分区中的每条消息被标上一个有序的数字,这个数字就是offset。offset可以用来标识每条消息的位置。
consumer group, 多个Consumer组成的一个分组,应用可以并发的消费一个topic下多个分区中的消息,并发的消费者数量由topic的分区数决定。如下图的topic中包含的4个分区,被Consumer Group A和Consumner GroupB同时消费。同一组中多个Consumer之间是竞争关系,共享一个offset,一条消息被一个Consumer消费之后就不能再被另一个Consumer消费。不同组的Consumer之间,各自有自己的offset,假设GroupA此时的offset为10,而GroupB的offset可能是5,二者之间不相互影响。
对于同一个topic,同一组的Consumer同时消费,可以实现负载均衡,不同组的Consumer,可以实现对消息的重复消费。
- high-level API和low-level API。Kafka提供了一个high-level的Consumer API,它可以实现consumer group和自动容错,但是不能支持一些更复杂的使用场景,同也提供了一套simple的low-level API的Consumer API,提供更全面、更细粒度的控制,但是这种Consumer需要开发者自己设计容错机制。
Kafka Python实践
kafka的python包主要有kafka(pip install kafka)和kafka-python(pip install kafka-python)两种,建议使用kafka-python,kafka-python的API更丰富一些,下面的实践代码基于kafka-python。
1 简单的生产和消费
本文中servers的地址为:
servers = ['192.168.2.152:9092', '192.168.2.153:9092', '192.168.2.154:9092']
生产者
from kafka import KafkaProducer
def producer_message():
producer = KafkaProducer(bootstrap_servers=servers)
for i in range(100):
msg = "some_message_bytes " + str(i)
producer.send('kafka-topic', bytes(msg.encode("utf-8")))
print(msg)
消费者
from kafka import KafkaConsumer
from kafka import TopicPartition
# 消费方式1,默认消费所有分区的消息
def consumer_message1():
consumer = KafkaConsumer('kafka-topic',
ootstrap_servers=servers,
group_id="kafka-group-id")
# consumer = KafkaConsumer('kafka-topic', bootstrap_servers=servers)
for msg in consumer:
print(msg)
# 消费方式2, 指定消费分区
def consumer_message2():
consumer = KafkaConsumer(bootstrap_servers=servers,
group_id="kafka-group-id")
consumer.assign([TopicPartition('kafka-topic', 0)])
for msg in consumer:
print(msg)
# 消费方式3,手动commit,生产中建议使用这种方式
def consumer_message3():
consumer = KafkaConsumer(bootstrap_servers=servers,
consumer_timeout_ms=1000,
group_id="kafka-group-id",
enable_auto_commit=False)
consumer.assign([TopicPartition('kafka-topic', 0)])
for msg in consumer:
print(msg)
consumer.commit()
2 简单的管理接口
获取所有的topic
kafka-python获取所有的topic接口是在KafkaConsumer类中实现的。笔者觉得这个方法放到KafkaAdminClient中可能更合适。
from kafka import KafkaConsumer
# 获取topic列表以及topic的分区列表
def retrieve_topics():
consumer = KafkaConsumer(bootstrap_servers=servers)
print(consumer.topics())
# 获取topic的分区列表
def retrieve_partitions(topic):
consumer = KafkaConsumer(bootstrap_servers=servers)
print(consumer.partitions_for_topic(topic))
# 获取Consumer Group对应的分区的当前偏移量
def retrieve_partition_offset():
consumer = KafkaConsumer(bootstrap_servers=servers,
group_id='kafka-group-id')
tp = TopicPartition('kafka-topic', 0)
consumer.assign([tp])
print("starting offset is ", consumer.position(tp))
创建、删除topic
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
admin = KafkaAdminClient(bootstrap_servers=servers)
# 创建topic
def create_topic():
try:
new_topic = NewTopic("create-topic", 8, 3)
admin.create_topics([new_topic])
except TopicAlreadyExistsError as e:
print(e.message)
# 删除topic
def delete_topic():
admin.delete_topics(["create-topic"])
获取消费组信息
from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=servers)
# 获取消费组信息
def get_consumer_group():
# 显示所有的消费组
print(admin.list_consumer_groups())
# 显示消费组的offsets
print(admin.list_consumer_group_offsets("kafka-group-id"))
获取topic配置信息
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
admin = KafkaAdminClient(bootstrap_servers=servers)
# 获取topic的配置信息
def get_topic_config():
resource_config = ConfigResource(ConfigResourceType.TOPIC, "create-topic")
config_entries = admin.describe_configs([resource_config])
print(config_entries.resources)
kafka-python KafkaAdminClient中的bug
kafka-python包中的kafka\admin\client.py的第335行,
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
修改为
for topic, error_code, *a in getattr(response, "topic_errors", response.topic_error_codes):
总结
- 对于同一个topic,不同组之间的Consumer各自维持一个offset,同一组内的Consumer公用同一个offset。
- 当一个topic需要被不同的应用消费时,这些应用应设置不同的group_id,从而各自维持一个自己的offset。
- 若组内有多个Consumer并发消费,最好创建topic时指定topic的分区数量,topic的分区数量决定了同时能有多少个Consumer并发消费。