在sanic 中使用 aio kafka

use aio kafka in sanic

一. producer

1. install

pip install aiokafka

2. initialization producer

eg:

@app.listener('before_server_start')
async def server_init(app, loop):
    app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                    bootstrap_servers=kafka_host)
    await app.producer.start()

3. use

await app.producer.send("topic_name", dict)

二. consumer

1. initialization consumer

eg:

async def process(consumer):
    async for msg in consumer:
        await func(msg)  # msg 处理函数 必须使用协程

@app.listener("after_server_start")  # 必须 after_server_start
async def after_server(app, loop):
    app.consumer = AIOKafkaConsumer(
        'user',
        loop=loop, bootstrap_servers=kafka_host,
        group_id="my-group4343")
    await app.consumer.start()
    await process(app.consumer)

2. use

自定义协程 func function

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,303评论 19 139
  • 这个连接器提供了对由Apache Kafka提供的事件流的访问。 Flink 提供了特殊的Kafka Connec...
    写Bug的张小天阅读 21,604评论 2 17
  • 目标 高吞吐量来支持高容量的事件流处理 支持从离线系统加载数据 低延迟的消息系统 持久化 依赖文件系统,持久化到本...
    jiangmo阅读 1,444评论 0 4
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,138评论 0 43
  • 记不清楚今天是来到深圳的第几个日夜,更想不起自从来深圳后,多少次晚十点后拖着累成狗一样的驱壳爬上住所的七楼... ...
    文娱先生阅读 414评论 1 1

友情链接更多精彩内容