python三方库之kafka

更详细的可以参考:https://zhuanlan.zhihu.com/p/279784873
https://blog.csdn.net/weixin_35688430/article/details/111292744

生产者:producer
生产者kafka有对应的三方库可以支撑去进行信息发送。

from kafka import KafkaProducer
import json
def kafka_sendMsg(topic,bootstrap_servers,key,msg,header) :
        producer = KafkaProducer(value_serializer= lambda v :json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092'])

消费者:consumer
消费者,同样可以通过topic和服务器名称,去获取对应的数据。

from kafka import KafkaconsumerMsg(topic,bootstrap_servers)
import json
def kafka_sendMsg(topic,bootstrap_servers,key,msg,header) :
        consumer= KafkaConsumer(topic,bootstrap_servers=bootstrap_servers)
        for msg in consumer:
              recv ="%s:%d%d:  key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key, msg.value)

使用kafka

def kafka():
      topic = "AA1.reaply.info"  #kafka的主题名称
      bootstrap_servers = "127.0.0.1"  #kafka连接
      kafkaMsg = kafka_msg_mode.getModel()  #字典,获取对应的取值
      key  = bytes(kafkaMsg["msgkey"]+self.feature.loginUserNo,encoding='utf-8')  #kakfa的key
      msg = json.loads(topic_str_info) #kafka的value
      header =["Headers"]
      kafka_sendMsg(topic,bootstrap_servers,key,msg,header)  #向kafka中间件发送消息
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容