use aio kafka in sanic
一. producer
1. install
pip install aiokafka
2. initialization producer
eg:
@app.listener('before_server_start')
async def server_init(app, loop):
app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=kafka_host)
await app.producer.start()
3. use
await app.producer.send("topic_name", dict)
二. consumer
1. initialization consumer
eg:
async def process(consumer):
async for msg in consumer:
await func(msg) # msg 处理函数 必须使用协程
@app.listener("after_server_start") # 必须 after_server_start
async def after_server(app, loop):
app.consumer = AIOKafkaConsumer(
'user',
loop=loop, bootstrap_servers=kafka_host,
group_id="my-group4343")
await app.consumer.start()
await process(app.consumer)
2. use
自定义协程 func
function