分布式消息发布和订阅系统
提供了类似JMS的特性
对用户行为收集
日志收集
Broker:一个kafka服务
Producer
consumer
topic: kafka集群的类别,一类数据的集合
partition: 每一个topic中具体的物理分区
consumer group:每一个consumer都有一个对应的group 对应一个topic,达到发布订阅的功能
官网
博客
视频
LMS、AQMP消息模型
- JMS(java消息服务)
- 点对点(一对一)
- Quene
- 点对点(一对一)
- AMQP(高级消息队列协议)
- 队列
- 信箱
-
绑定
image.png
创建多broker集群
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
因为原文档是在一台机器上搭建的为分布式,所以在多台机器上搭建的时候只用更改brokerid以及对应的zookeeper节点即可
然后我们启动各个机器上的broker
多节点多broker
创建一个新的拥有备份的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看一下这个topic的描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行介绍了所有的分区,其他的行,每一行都介绍了一个分区的信息,因为我们只创建了一个分区所以这里只有一行数据
- ‘leader’就是该分区所属的broker,负责这个分区的一些读写操作。
- replicas 就是这个分区的日志备份brokers,无论他们是否是leader还是是否alive
- isr 这个记录了被leader捕获并且还活着的上面replicas的子集
使用kafka connect 来导入/导出数据
我们经常需要用kafka导入别的数据源,或者导出到别的系统。所以kafka提供了个工具叫做kafka connect.
Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。 它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。
官方文档给的案例是一个kafak connect,它实现了从文件的导入和导出,producer可以从文件1读取数据进入kafka, consumer则读取数据并写入文件2中,实现了在文件系统中的发布订阅。
python kafka
安装
pip install confluent-kafka
官方文档
github
Admin API
kafka的控制端,创建、浏览、改变、删除主题和资源
class confluent_kafka.admin.AdminClient(conf)
AdminClient 为kafka 的brokers提供一些控制操作,topics、groups、以及其他borker支持的资源类型。
Admin API方法是异步的,并返回由实体键入的concurrent.futures.Future对象的dict。
实体是一个topic 名字供create_topics(), delete_topics(), create_partitions()调用,并且一个ConfigResource 供alter_configs(), describe_configs()调用。
查看使用案例 examples/adminapi.py
下面是可以调用的函数:
alter_configs(resources, **kwargs)
改变配置
create_partitions(new_partitions, **kwargs)
为给定的topic创建新分区
create_topics(new_topics, **kwargs)
集群创建新topic
delete_topics(topics, **kwargs)
删除topic
describe_configs(resources, **kwargs)
查看某个特定资源的配置
class confluent_kafka.admin.BrokerMetadata
包含kafka broker 信息的类
class confluent_kafka.admin.ClusterMetadata
包含kafka 集群、brokers、topics信息的对象
class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, \
is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])
describe_configs()的返回对象
class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
展示拥有配置的资源,通过资源类型和名字进行实例化。
ConfigResource.Type 描绘了kafka 资源的type
ConfigResource.set_config(name, value, overwrite=True)
设置或者重写配置参数
class confluent_kafka.admin.ConfigSource
Config sources returned in ConfigEntry by describe_configs().
class confluent_kafka.admin.PartitionMetadata
| Variables: |
- id (int) – Partition id.
- leader (int) – Current leader broker for this partition, or -1.
- replicas (list(int)) – List of replica broker ids for this partition.
- isrs (list(int)) – List of in-sync-replica broker ids for this partition.
- error (KafkaError) – Partition error, or None. Value is a KafkaError object.
class confluent_kafka.admin.TopicMetadata
Variables:
topic (str) – Topic name.
partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
error (KafkaError) – Topic error, or None. Value is a KafkaError object.
Producer API
- 采用异步发送消息
image.png - 采用同步发送消息
- 批量发送消息
- 消息大小
- 延迟时间
- acks
生产者发送消息后,服务器的回执- 0
生产者不等待服务器,消息发送到缓冲区就ok了 - 1
broker收到就ok, 不管follower是否备份 - -1/all
等到broker收到, follower备份
- 0
- retries
当消息发送失败重复的次数,默认为0 - 至多一次
acks=0 或acks=1 - 至少一次
acks =-1或all
retries >0 - 精确一次与幂等
enable.idempotence = true
//retries = integer.MAX_VALUE
acts = all - 事务
消息要么全部成功,要么全部失败
class confluent_kafka.Producer
kafka 异步 producer
Producer(config)
config(dict)—参数属性,至少需要设置bootstrap.severs
len()
返回等待传递给broker的消息数和请求数, type:int
flush([timeout])
等待producer队列中的所有消息被发送。timeout是最大的堵塞时间,返回仍在队列中的消息数
poll([timeout])
轮询生产者的事件并调用相应的回调(已注册的)
- on_delivery :produce()的回调
参数:timeout-最大堵塞时间(秒)
返回:被处理的事件数(调用回调)(int)
list_topics([topic=None][,timeout=-1])
从集群中请求元数据。这个方法提供了listTopics(), describeTopics() and describeCluster() 在java客户端中同样的信息
参数
- topic(str) - 如果提供了这个参数,那么仅仅显示有关这个topic的信息,兜着返回所有集群中的topic信息。
- timeout 最大的响应时间在超时之间, -1是无限timeout
Return type:ClusterMetadata
Raises: KafkaException
produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
向topic发送消息。这是一个异步操作,一个应用可能会使用回调(别名 on_delivery)参数来传递一个函数或者匿名函数,当消息被成功发送或者永久失败时,就会被poll()调用。
参数
- topic(str)
- value(str|bytes)- 消息负载
- key(str|bytes)-消息key
- partition(int)- 发送消息的分区,否则使用默认的
- on_delivery(err,msg) (func) -被调用的报告回调函数
- timestamp(int)-消息时间戳
-
dict|list (headers)
消息的消息头部。头部的key必须是一个字符串,值必须是二进制,unicode或者None。 Accepts a list of (key,value) or a dict。
Return type:None
Consumer API
class confluent_kafka.Consumer
Consumer(config)
Parameters: config (dict) – 配置参数。至少得设置bootstrap.servers和group.id
创建一个新的消费端
特殊参数:on_commit:当一个commit request 成功或失败时调用的回调
on_commit(err, partitions)
参数:
- consumer (Consumer) –consumer实例
- err (KafkaError) – commit error object
- partitions (list(TopicPartition)) –分区列表包括他们的 committed offsets or per-partition errors.
assign()
assign(partitions)
将消费的消息分区设置为提供的TopicPartition列表并开始消费
参数
- partitions (list(TopicPartition)) – 主题+分区的列表以及可选择的消费的初始offset
assignment()
返回目前的分区设置情况
close()
关闭消费者
进行的操作:
- 结束消费
- 提交offsets 除非消费者属性'enable.auto.commit'设置为False
- 离开用户组
commit()
commit([message=None][, offsets=None][, asynchronous=True])
提交一个信息或者offsets列表
消息和偏移是互斥的,如果两者都没有设置,则使用当前分区分配的偏移。 如果您将'enable.auto.commit'设置为False,则consumer依赖于您使用此方法
committed()
committed(partitions[, timeout=None])
检索分区中已经提交的offsets
consume()
consume([num_messages=1][, timeout=-1])
消费消息,调用回调函数以及返回消息列表。应用程序必须检查返回的Message对象的Message.error()方法,以区分正确的消息(error()返回None),或列表中每个Message的事件或错误(请参阅error()。code()以获取详细信息)。
get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])
检索分区的low and high offsets
cached (bool) – 不是查询broker所使用的内存信息。缓存值:定期更新低偏移量(如果设置了statistics.interval.ms),同时在从此分区的broker获取的每条消息上更新高偏移量。
Return : Tuple of (low,high) on success or None on timeout.
list_topics()
list_topics([topic=None][, timeout=-1])
返回元数据
offsets_for_times()
offsets_for_times(partitions[, timeout=None])
offsets_for_times按给定分区的时间戳查找偏移量。
每个分区的返回偏移量是最早的偏移量,其时间戳大于或等于相应分区中的给定时间戳。
pause()
pause(partitions)
暂停该分区的消费
poll()
poll([timeout=None])
使用消息,调用回调并返回事件。
position()
position(partitions[, timeout=None])
检索给定分区的当前偏移量。
resume()
resume(partitions)
恢复提供的分区列表的消耗。
seek(partition)
将分区的消耗位置设置为偏移量。 偏移可以是绝对(> = 0)或逻辑偏移(OFFSET_BEGINNING等)。
seek()可以仅用于更新主动消耗的分区的消耗偏移(即,在assign()之后),以设置未被消耗的分区的起始偏移,而不是在assign()调用中传递偏移。
store_offsets()
store_offsets([message=None][, offsets=None])
存储消息或偏移列表的偏移量。
消息和偏移是互斥的。 存储的偏移量将根据'auto.commit.interval.ms'或手动无偏移提交()进行提交。 请注意,使用此API时,“enable.auto.offset.store”必须设置为False。
subscribe()
subscribe(topics[, listener=None])
设置订阅提供的主题列表这将替换以前的订阅。
可以通过正则化进行订阅:
consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
回调函数
on_assign(consumer, partitions)
on_revoke(consumer, partitions)
unassign()
删除当前分区设置并停止消费。
unsubscribe()
移除当前订阅
TopicPartition API
class confluent_kafka.TopicPartition
TopicPartition是一种通用类型,用于保存单个分区及其各种信息。
它通常用于为各种操作提供主题或分区列表,例如Consumer.assign()。
TopicPartition(topic[, partition][, offset])
实例化一个topicpartition对象
参数
- topic (string) – Topic name
- partition (int) – Partition id
- offset (int) – Initial partition offset
属性:
error
offset
partition
topic
Message API
Message对象表示单个消费或生成的消息,或者一个错误事件(error()不是None)。
应用程序必须检查error()以查看对象是否是正确的消息(error()返回None)或错误/事件。
这个对象不需要用户初始化
方法:
-
len()
返回消息大小 -
error()
Return type: None or KafkaError 用来检查是否消息是错误事件 -
headers()
检索消息的头部。每个头部都是一个键值对。注意消息头的key是有序且可重复的 -
key()
Returns: message key or None if not available. -
offset()
Returns: message offset or None if not available. -
partition()
Returns: partition number or None if not available. -
set_headers()
Set the field ‘Message.headers’ with new value. -
set_key()
Set the field ‘Message.key’ with new value. -
set_value()
Set the field ‘Message.value’ with new value. -
timestamp()
从消息中检索时间戳类型和时间戳。 时间戳类型是以下之一:- TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker
- TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time)
- TIMESTAMP_LOG_APPEND_TIME - Broker receive time
-
topic()
Returns: topic name or None if not available. -
value()
Returns: message value (payload) or None if not available.
Offset API
逻辑offset常量:
- OFFSET_BEGINNING - Beginning of partition (oldest offset)
- OFFSET_END - End of partition (next offset)
- OFFSET_STORED - Use stored/committed offset
- OFFSET_INVALID - Invalid/Default offset