# -*- coding: utf8 -*-
# __author__ = '小红帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 打印帮助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消费组
--topic=<topic_name> topic名称
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主键,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 条数据" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超时时间,插入es %s 条数据" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()