背景
项目中需要创建分区(partitions)数不同的Topic。在server.properties中可以配置默认的Topic分区数量,但是不能在需要的时候任意改变。(使用Producer API会自动创建Topic)
简单的Solution
翻遍了Kafka-python的文档,没有发现kafka-python提供了类似client.create_topic(name='test', num_partitions=3)
这样简单的API。只能往底层探索了,果然发现了两个关键信息。
在KafkaClient API中有这样一个方法:
在kafka.protocol.admin — kafka-python 1.3.4.dev documentation中:
显然,我们只需要构建一个CreateTopicsRequest的请求,然后通过KafkaClient的send()方法发送给控制节点(由于本小白也不太清楚Kafka的机制,测试的时候,不是控制节点,会报错。也不清楚各个版本的区别,下面代码用的是v0版本。🤣)
原理就是这样,还是很简单的。
糟糕的Code
def create_topic(self, topic='topic', num_partitions=3, configs=None, timeout_ms=3000, brokers=['localhost:9290'], no_partition_change=True):
client = KafkaClient(bootstrap_servers=brokers)
if topic not in client.cluster.topics(exclude_internal_topics=True): # Topic不存在
request = admin.CreateTopicsRequest_v0(
create_topic_requests=[(
topic,
num_partitions,
-1, # replication unset.
[], # Partition assignment.
[(key, value) for key, value in configs.items()], # Configs
)],
timeout=timeout_ms
)
future = client.send(2, request) # 2是Controller,发送给其他Node都创建失败。
client.poll(timeout_ms=timeout_ms, future=future, sleep=False) # 这里
result = future.value
# error_code = result.topic_error_codes[0][1]
print("CREATE TOPIC RESPONSE: ", result) # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
client.close()
else: # Topic已经存在
print("Topic already exists!")
return