kafka相关概念及应用实战(Python代码)

什么是kafka

  • Apache Kafka是一个分布式的流处理平台,由Scala写成,并由Apache软件基金会开发的一个开源消息系统项目,该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
  • Kafka是一个分布式消息中间件。

kafka的整体架构和主要概念

kafka架构.png
  • Producer, 向队列中发送消息。Produce负责选择将一条记录添加到topic中的某个partition中。

  • Consumer,从队列中消费消息。每个Consumer都要有一个Consumer group。

  • Broker,每一个kafka实例就是一个broker,一个broker可以有多个topic。

  • topic,用于分类消息。topic其本质是一个目录,可将同一主题消息归类到同一个目录。生产者向topic中写入数据时的详情如下:


    log_anatomy.png
  • partition:一个topic可以有1到n个分区,可以在创建topic时,指定topic的分区数,单机版的kafka的topic默认只有一个分区。每个分区中的消息的顺序是有序、且不可改变的。

  • offset:分区中的每条消息被标上一个有序的数字,这个数字就是offset。offset可以用来标识每条消息的位置。


    offset
  • consumer group, 多个Consumer组成的一个分组,应用可以并发的消费一个topic下多个分区中的消息,并发的消费者数量由topic的分区数决定。如下图的topic中包含的4个分区,被Consumer Group A和Consumner GroupB同时消费。同一组中多个Consumer之间是竞争关系,共享一个offset,一条消息被一个Consumer消费之后就不能再被另一个Consumer消费。不同组的Consumer之间,各自有自己的offset,假设GroupA此时的offset为10,而GroupB的offset可能是5,二者之间不相互影响。

consumer-groups

对于同一个topic,同一组的Consumer同时消费,可以实现负载均衡,不同组的Consumer,可以实现对消息的重复消费。

  • high-level API和low-level API。Kafka提供了一个high-level的Consumer API,它可以实现consumer group和自动容错,但是不能支持一些更复杂的使用场景,同也提供了一套simple的low-level API的Consumer API,提供更全面、更细粒度的控制,但是这种Consumer需要开发者自己设计容错机制。

Kafka Python实践

kafka的python包主要有kafka(pip install kafka)和kafka-python(pip install kafka-python)两种,建议使用kafka-python,kafka-python的API更丰富一些,下面的实践代码基于kafka-python。

1 简单的生产和消费

本文中servers的地址为:

servers = ['192.168.2.152:9092', '192.168.2.153:9092', '192.168.2.154:9092']

生产者

from kafka import KafkaProducer
def producer_message():
    producer = KafkaProducer(bootstrap_servers=servers)
    for i in range(100):
        msg = "some_message_bytes " + str(i)
        producer.send('kafka-topic', bytes(msg.encode("utf-8")))
        print(msg)

消费者

from kafka import KafkaConsumer
from kafka import TopicPartition

# 消费方式1,默认消费所有分区的消息
def consumer_message1():
    consumer = KafkaConsumer('kafka-topic', 
                             ootstrap_servers=servers, 
                             group_id="kafka-group-id")
    # consumer = KafkaConsumer('kafka-topic', bootstrap_servers=servers)
    for msg in consumer:
        print(msg)

# 消费方式2, 指定消费分区
def consumer_message2():
    consumer = KafkaConsumer(bootstrap_servers=servers, 
                             group_id="kafka-group-id")
    consumer.assign([TopicPartition('kafka-topic', 0)])
    for msg in consumer:
        print(msg)

# 消费方式3,手动commit,生产中建议使用这种方式
def consumer_message3():
    consumer = KafkaConsumer(bootstrap_servers=servers,
                             consumer_timeout_ms=1000,
                             group_id="kafka-group-id",
                             enable_auto_commit=False)
    consumer.assign([TopicPartition('kafka-topic', 0)])
    for msg in consumer:
        print(msg)
        consumer.commit()

2 简单的管理接口

获取所有的topic

kafka-python获取所有的topic接口是在KafkaConsumer类中实现的。笔者觉得这个方法放到KafkaAdminClient中可能更合适。

from kafka import KafkaConsumer

# 获取topic列表以及topic的分区列表
def retrieve_topics():
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer.topics())

# 获取topic的分区列表
def retrieve_partitions(topic):
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer.partitions_for_topic(topic))

# 获取Consumer Group对应的分区的当前偏移量
def retrieve_partition_offset():
    consumer = KafkaConsumer(bootstrap_servers=servers,
                             group_id='kafka-group-id')
    tp = TopicPartition('kafka-topic', 0)
    consumer.assign([tp])
    print("starting offset is ", consumer.position(tp))

创建、删除topic

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError

admin = KafkaAdminClient(bootstrap_servers=servers)

# 创建topic
def create_topic():
    try:
        new_topic = NewTopic("create-topic", 8, 3)
        admin.create_topics([new_topic])
    except TopicAlreadyExistsError as e:
        print(e.message)

# 删除topic
def delete_topic():
    admin.delete_topics(["create-topic"])

获取消费组信息

from kafka import KafkaAdminClient

admin = KafkaAdminClient(bootstrap_servers=servers)
# 获取消费组信息    
def get_consumer_group():
    # 显示所有的消费组
    print(admin.list_consumer_groups())

    # 显示消费组的offsets
    print(admin.list_consumer_group_offsets("kafka-group-id"))

获取topic配置信息

from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType

admin = KafkaAdminClient(bootstrap_servers=servers)
# 获取topic的配置信息
def get_topic_config():
    resource_config = ConfigResource(ConfigResourceType.TOPIC, "create-topic")
    config_entries = admin.describe_configs([resource_config])
    print(config_entries.resources)

kafka-python KafkaAdminClient中的bug

kafka-python包中的kafka\admin\client.py的第335行,

for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):

修改为

for topic, error_code, *a in getattr(response, "topic_errors", response.topic_error_codes):
image.png

总结

  • 对于同一个topic,不同组之间的Consumer各自维持一个offset,同一组内的Consumer公用同一个offset。
  • 当一个topic需要被不同的应用消费时,这些应用应设置不同的group_id,从而各自维持一个自己的offset。
  • 若组内有多个Consumer并发消费,最好创建topic时指定topic的分区数量,topic的分区数量决定了同时能有多少个Consumer并发消费。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,797评论 13 425
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 10,742评论 1 15
  • Kafka提供的主要功能 生产者 ——>消息队列 <——消费者 所谓消息对象,本质上就是由生产者向消息队列不断发送...
    leofight阅读 5,586评论 0 5
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 13,691评论 4 54
  • 2018 ― 05 ― 19 星期二 小雨 奶奶89岁了,家在河南的一个农村,姥姥86岁了,是和女孩在同一个...
    沉秋a阅读 3,345评论 3 11