直接上code
import pymysql
import configparser
from confluent_kafka import KafkaError,Consumer, KafkaException
from bson import json_util
import pandas as pd
import hashlib
import logging
import os
import datetime
import time
import sqlalchemy
from confluent_kafka.avro.serializer import SerializerError
from pymongo import MongoClient
class kafkaconsumer():
def __init__(self):
self.config = configparser.RawConfigParser()
self.config.read('./config.cfg')
if not os.path.exists('./Log'):
os.makedirs('./Log')
self.logger=logging.getLogger('writeMysql')
self.logger.setLevel(logging.INFO)
fh = logging.FileHandler('Log/writeMysql.log')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
self.GatewayName = 'Test_Kafka'
self.MongoClient = MongoClient(self.config.get('MongoDB','uri'))
def connect_kafka(self):
global consumer
epoch_start = datetime.datetime(1970, 1, 1)
Count = 0
TopicBuffer = {}
try :
Source_Kafka_Consumer = Consumer({
'bootstrap.servers':self.config.get('Source_Kafka','kafkaservers')
,'group.id':self.config.get('Source_Kafka','groupID')
,'auto.offset.reset':self.config.get('Source_Kafka','offsetReset')
, 'session.timeout.ms': 6000
}, logger=self.logger)
Source_Kafka_Topics = self.config.get('Source_Kafka','topics').split(',')
json_util.DEFAULT_JSON_OPTIONS.strict_uuid = True
consumer = Source_Kafka_Consumer
consumer.subscribe(Source_Kafka_Topics)
print('Kafka connect Successfully !')
except Exception as inst :
Source_Kafka_Consumer = None
consumer = None
print('Source Kafka Consumer init fail')
print(inst)
mongodb = self.MongoClient['Kafka']
try:
while True:
try :
msg = consumer.poll(1)
except SerializerError as e:
print('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
msg.topic(),
msg.partition(),
msg.offset(),
e
))
logging.error('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
msg.topic(),
msg.partition(),
msg.offset(),
e
))
continue
if msg is None:
continue
if msg.error():
print('Consumer error: {}'.format(msg.error()))
continue
# data = msg.value()
data = json_util.loads(msg.value())
if data.get('evt_dt',None) != None :
delta = datetime.timedelta(milliseconds=data['evt_dt'])
data['evt_dt2'] = (epoch_start + delta)
data['evt_dt2_China'] = (epoch_start + delta + datetime.timedelta(hours=8))
Count += 1
print(data)
if TopicBuffer.get(msg.topic()) is None :
TopicBuffer[msg.topic()] = []
TopicBuffer[msg.topic()].append(data)
if Count % 100 == 0 :
print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
topics = TopicBuffer.keys()
for topic in topics :
col = mongodb[topic]
if len(TopicBuffer[topic]) > 0 :
try :
col.insert_many(TopicBuffer[topic])
TopicBuffer[topic] = []
except Exception as inst :
print('Mongo write fail')
print(inst)
if Count % 10000 == 0 :
Count = 0
except KeyboardInterrupt:
print ('KeyboardInterrupt')
except Exception as inst :
print (inst)
if Count % 100 != 0 :
print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
topics = TopicBuffer.keys()
for topic in topics :
col = mongodb[topic]
if len(TopicBuffer[topic]) > 0 :
try :
col.insert_many(TopicBuffer[topic])
TopicBuffer[topic] = []
except Exception as inst :
print('Mongo write fail')
print(inst)
consumer.unsubscribe()
consumer.close()
print('{0} Finish'.format(self.GatewayName))
logging.info('{0} Finish'.format(self.GatewayName))
if __name__ == '__main__':
commonObj = kafkaconsumer()
commonObj.connect_kafka()
config
[Source_Kafka]
kafkaservers =
groupID =
offsetReset = earliest
topics =
[MongoDB]
uri = mongodb://XXX:XXX