准备工作:mysql库的安装。
python中mysql库用的是mysql-connector,安装执行如下命令:
pip install mysql-connector
第一步:连接mysql,读取数据。
通过执行sql语句,读取mysql数据。
# mysql读取数据
import pandas as pd
import datetime
def get_rawdata():
import mysql.connector
config = {
'host' : '172.10.3.165',
'user' : 'root',
'password' : '!QAZxcvfr432',
'port' : 3306,
'database' : 'passenger_flow',
'charset' : 'gb2312'
}
cnn = mysql.connector.connect(**config) # 建立MySQL连接
cursor = cnn.cursor() # 获得游标
sql = "SELECT cast(TRAN_DATE AS DATE) as tran_date, cast(CONCAT(TRAN_TIME_MIN,'00') AS TIME) as tran_time, cast(PASSENGER_NUM AS DECIMAL (12,2)) as passenger_num, cast(DEVICE_ID AS CHAR(8)) as device_id " \
"FROM passenger_flow.pass_flow_info_his" # SQL语句
raw_data = pd.read_sql(sql,cnn)
cursor.close() # 关闭游标
cnn.close() # 关闭连接
return raw_data
至此,获得mysql的原始数据raw_data 。接下来对数据进行预处理,按日期进行分组聚合,然后重命名行和列名,得到dataFrame格式的数据。
raw_data = get_rawdata()
# 按日期分组
date_flow = date_flow.rename_axis('date').reset_index(name='counts')
第二步:连接ES。
这步没有太多的可解释的地方,就是配置信息。
# 连接ES
def connect_es(es_ip, es_port):
from elasticsearch import Elasticsearch
es_ip = es_ip
es_port = es_port
es = Elasticsearch(
[es_ip]
#,http_auth=('elastic', 'passwd')
,port=es_port
)
return es
第三步:ES主键加密。
这步的目的是为了保持主键唯一性,防止重复写入。用的方法是md5加密。
# md5 加密
def md5(string):
import hashlib
# 对要加密的字符串进行指定编码
string = string.encode(encoding ='UTF-8')
# md5加密
return hashlib.md5(string).hexdigest()
第四步:写入ES
至此,一切的准备工作都做好了,数据也有了,主键加密也做了,就开始写入了。
def write_to_es():
from elasticsearch import helpers
# 写入es
actions = []
for index, row in date_flow.iterrows():
day = datetime.datetime.strftime(row[0], '%Y-%m-%d')
action = {
"_index": 'pass_flow_index',
"_id": md5(day),
"_source": {
"TRAN_DATE": day,
"DATE_FLOW": row[1]
}
}
actions.append(action)
helpers.bulk(es, actions)
write_to_es()
用main方法执行以上方法:
if __name__ == "__main__":
raw_data = get_rawdata()
date_flow = date_flow.rename_axis('date').reset_index(name='counts')
es = connect_es('172.10.3.10', 10200)
write_to_es()
最后查看一下ES写的是否成功,用查询方法
# 查询es
body = {
"query":{
"match_all":{}
}
}
es.search(index="pass_flow_index" ,body=body)
如果返回以下信息,说明ES里成功插入了数据。
另外,ES删除索引的操作:
# 删除索引
es.indices.delete('pass_flow_index')