python:kafka to mongo

直接上code

import pymysql
import configparser
from confluent_kafka import KafkaError,Consumer, KafkaException
from bson import json_util
import pandas as pd
import hashlib
import logging
import os
import datetime
import time
import sqlalchemy
from confluent_kafka.avro.serializer import SerializerError
from pymongo import MongoClient

class kafkaconsumer():
    def __init__(self):
        self.config = configparser.RawConfigParser()
        self.config.read('./config.cfg')

        if not os.path.exists('./Log'):
            os.makedirs('./Log')   
        self.logger=logging.getLogger('writeMysql')
        self.logger.setLevel(logging.INFO)
        fh = logging.FileHandler('Log/writeMysql.log')
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)  

        self.GatewayName = 'Test_Kafka'
        self.MongoClient = MongoClient(self.config.get('MongoDB','uri'))

    def connect_kafka(self):  
        global consumer
        epoch_start = datetime.datetime(1970, 1, 1)
        Count = 0
        TopicBuffer = {}
        try :
            Source_Kafka_Consumer = Consumer({
                'bootstrap.servers':self.config.get('Source_Kafka','kafkaservers')
                ,'group.id':self.config.get('Source_Kafka','groupID')
                ,'auto.offset.reset':self.config.get('Source_Kafka','offsetReset')
                , 'session.timeout.ms': 6000
            }, logger=self.logger)
            Source_Kafka_Topics = self.config.get('Source_Kafka','topics').split(',')
            json_util.DEFAULT_JSON_OPTIONS.strict_uuid = True
            consumer = Source_Kafka_Consumer
            consumer.subscribe(Source_Kafka_Topics)
            print('Kafka connect Successfully !')

        except Exception as inst :
            Source_Kafka_Consumer = None
            consumer = None
            print('Source Kafka Consumer init fail')
            print(inst)

        mongodb = self.MongoClient['Kafka']
            
        try:
            while True:
                try :
                    msg = consumer.poll(1)
                except SerializerError as e:
                    print('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
                        msg.topic(),
                        msg.partition(),
                        msg.offset(),
                        e
                    ))
                    logging.error('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
                        msg.topic(),
                        msg.partition(),
                        msg.offset(),
                        e
                    ))
                    continue

                if msg is None:
                    continue
                if msg.error():
                    print('Consumer error: {}'.format(msg.error()))
                    continue
                # data = msg.value()
                data = json_util.loads(msg.value())
                
                if data.get('evt_dt',None) != None :
                    delta = datetime.timedelta(milliseconds=data['evt_dt'])
                    data['evt_dt2'] = (epoch_start + delta)
                    data['evt_dt2_China'] = (epoch_start + delta + datetime.timedelta(hours=8))

                Count += 1
                print(data)
                if TopicBuffer.get(msg.topic()) is None :
                    TopicBuffer[msg.topic()] = []
                TopicBuffer[msg.topic()].append(data)

                if Count % 100 == 0 :
                    print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
                    logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
                    
                    topics = TopicBuffer.keys()
                    for topic in topics :
                        col = mongodb[topic]
                        if len(TopicBuffer[topic]) > 0 :
                            try :
                                col.insert_many(TopicBuffer[topic])
                                TopicBuffer[topic] = []
                            except Exception as inst :
                                print('Mongo write fail')
                                print(inst)
                if Count % 10000 == 0 :
                    Count = 0
        except KeyboardInterrupt:
            print ('KeyboardInterrupt')
        except Exception as inst :
            print (inst)

        if Count % 100 != 0 :
            print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
            logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
            topics = TopicBuffer.keys()
            for topic in topics :
                col = mongodb[topic]
                if len(TopicBuffer[topic]) > 0 :
                    try :
                        col.insert_many(TopicBuffer[topic])
                        TopicBuffer[topic] = []
                    except Exception as inst :
                        print('Mongo write fail')
                        print(inst)

        consumer.unsubscribe()
        consumer.close()
        print('{0} Finish'.format(self.GatewayName))
        logging.info('{0} Finish'.format(self.GatewayName))

if __name__ == '__main__':
    commonObj = kafkaconsumer()
    commonObj.connect_kafka()

config

[Source_Kafka]
kafkaservers = 
groupID =  
offsetReset = earliest
topics = 

[MongoDB]
uri = mongodb://XXX:XXX
蟹蟹.jpg
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容