注释:1.MySQL表中字段类型需和es字段相互对应
2.日期转为字符串且“-”需要换为“\”
# -*- coding: utf-8 -*-
"""
Create by Mr.Hao on 2019/5/28.
"""
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import pymysql
from pymysql.cursors import DictCursor
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class FinanceStorage(object):
def __init__(self, *args, **kwargs):
super(FinanceStorage, self).__init__(*args, **kwargs)
def connection(self):
return pymysql.connect(host='127.0.0.1',
user='root',
passwd="",
db='bdp_spider')
def get_data(self):
conn = self.connection()
cursor = conn.cursor(DictCursor)
sql = "SELECT * FROM media_consume_table;"
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
class ElasticObj(object):
def __init__(self, index_name,index_type, ip ="127.0.0.1"):
'''
:param index_name: 索引名称
:param index_type: 索引类型
'''
self.index_name =index_name
self.index_type = index_type
self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
def create_index(self, index_name='data', index_type="media_consume_table"):
_index_mappings = {
"mappings": {
index_type: { # 相当于数据库中的表名
"properties": {
"id": {
"type": "long",
"store": True,
},
"dt": {
"type": "date",
"store": True,
"format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
},
"product": {
"type": "keyword",
"index": True,
},
"meida": {
"type": "keyword",
"index": True,
},
"meida_sub_categorey": {
"type": "keyword",
"index": True,
},
"account": {
"type": "keyword",
"index": True,
},
"ad_campaign": {
"type": "text",
# "analyzer": "ik_max_word",
# "search_analyzer": "ik_max_word"
},
"ad_type": {
"type": "keyword",
"index": True,
},
"consume": {
"type": "keyword",
"store": True,
},
"shows": {
"type": "keyword",
"store": True,
},
"clicks": {
"type": "keyword",
"store": True,
},
"pay_downloads": {
"type": "keyword",
"store": True,
},
"publish_time": {
"type": "date",
"store": True,
"format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
},
"update_time": {
"type": "date",
"store": True,
"format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
},
"ad_group": {
"type": "string", # 字符串
"store": True
},
"ad_case": {
"type": "string",
"store": True
}
}
}
}
}
if self.es.indices.exists(index=index_name) is not True:
res = self.es.indices.create(index=index_name, body=_index_mappings)
print res
def bulk_Index_Data(self):
'''
用bulk将批量数据存储到es
:return:
'''
list = FinanceStorage().get_data()
ACTIONS = []
i = 1
for line in list:
action = {
"_index": self.index_name,
"_type": self.index_type,
"_id": i, #_id 也可以默认生成,不赋值
"_source": {
"id": line['id'],
"dt": str(line['dt']).replace("-","/"),
"product": line['product'].encode('utf-8') ,
"media": line['media'].encode('utf-8') ,
"meida_sub_categorey": line['media_sub_category'].encode('utf-8') ,
"account": line['account'],
"ad_type": line['ad_type'].encode('utf-8'),
"ad_campaign": line.get('ad_campaign').encode('utf-8'),
"consume": line.get('consume'),
"shows": line.get('shows'),
"clicks": line.get('clicks'),
"publish_time": str(line['publish_time']).replace("-","/"),
"update_time": str(line['update_time']).replace("-","/"),
"ad_case": line['ad_case'].encode('utf-8'),
"ad_group": line['ad_group'].encode('utf-8')}
}
i += 1
print action
# 批量处理
ACTIONS.append(action)
success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
if __name__ == '__main__':
obj = ElasticObj("data", "media_consume_table")
obj.bulk_Index_Data()
# obj.create_index()
# FinanceStorage().get_data()
结果: