导包时,向环境变量添加路径
import sys
import os.path
_basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _basedir not in sys.path:
sys.path.insert(0, _basedir)
MySQL简易封装?
Elasticsearch kibana之初体验
from elasticsearch import Elasticsearch
es = Elasticsearch(['server-1', 'server-2', 't-elk3'], http_auth=(user, passwd), port=yourport)
# kibana查询参数
params = {
# 聚合,1.建桶 2.给桶命名 3.
"aggs": {
"user_id": {
"cardinality": {
"field": "user_id"
}
}
},
# 查询 1.选过滤器 2.过滤条件
"query": {
"match": {
"_type": {
"query": "user_sign",
"type": "phrase"
}
}
},
# 显示源
"_source": {
"include": ["user_id"]
}
}
page = es.search(
index=yourlog, # 选择要查询目标
scroll='2m', # 过期时间2分钟
size=10000, # 接收数据Buffer大小
body=params
)
# 返回值?
sid = page['_scroll_id']
scroll_size = page['hits']['total']
with open("user_signs.csv", "w") as f:
#f.write("用户ID,历史总签到数,历史连签最高,到10/16号连签数,昨天是否签到\n")
f.write("用户ID,历史总签到数,历史连签最高,到10/16号连签数\n")
# 用户ID集合用于去重
user_ids = set()
# Start scrolling
total_size = 0
while (scroll_size > 0):
print "Scrolling..."
page = es.scroll(scroll_id=sid, scroll='2m')
for source in page['hits']['hits']:
user_id = int(source['_source']['user_id'])
if user_id not in user_ids:
user_ids.add(user_id)
print "-------------%s" % user_id
# 查询signs数据库,获得用户签到数据
user_signs = get_user_sign_info(user_id)
# 写入csv文件
write_csv(user_signs)
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
total_size += scroll_size
print "scroll size: " + str(scroll_size)
print "total_size: " + str(total_size)
# Do something with the obtained page