前言
来,咱们今天接着聊ELK,前面我们讲了ELK的基本操作,ELK的日志检索。咱们今天来聊聊如何配置敏感信息的邮件通知,作为一个程序员不可能无时无刻的盯着ELK的日志大屏(后面再来聊聊日志的可视化操作),针对ELK的错误日志得及时关注,以免造成不必要的影响。下面我们以每10分钟发送503错误邮件通知为例。
一、使用管理工具Kibana: Elasticsearch watcher
1.1.编辑/etc/elasticsearch/elasticsearch.yml,在最后添加邮件发送者的相关设置。
xpack.notification.email.account:
outlook_account:
profile: outlook
smtp:
auth: true
starttls.enable: true
host: smtp.office365.com
port: 587
user: xxx@outlook.com
password: xxx
1.2.在Kibana创建一个定制watch。(或者直接使用curl命令添加到watch)
Kibana = > Management = > Elasticsearch = > Watcher = > Create new watch = > Advanced Watch:
{
"trigger" : {
"schedule" : { "cron" : "*/10 * * * * ?" }
},
"input" : {
"search" : {
"request" : {
"indices" : [
"test-qa-access*"
],
"body" : {
"query" : {
"bool" : {
"must" : {
"match": {
"response": 503
}
},
"filter" : {
"range": {
"@timestamp": {
"from": "{{ctx.trigger.scheduled_time}}||-10m",
"to": "{{ctx.trigger.triggered_time}}"
}
}
}
}
}
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
},
"actions" : {
"email_admin" : {
"email" : {
"from": "xxx@outlook.com",
"to" : "xxx@outlook.com",
"subject" : "TEST-QA-ACCESS-LOG - Encountered 503 errors - {{ctx.payload.hits.total}} times",
"body": "Body test"
}
}
}
}
测试执行。将操作模式设置为“执行”,如果条件满足,将发送到您的真实邮件。
二、在Elasticsearch中设置cron job查询
2.1.创建一个脚本alert.py,检查最近10分钟内是否遇到503错误。是则发送告警邮件,并在邮件正文中包含部分503错误信息
from elasticsearch import Elasticsearch
es = Elasticsearch()
import time
from datetime import date
today = date.today()
datestr = date.today().strftime("%Y.%m.%d")
searchidx = "test-qa-access-logs-cw-" + datestr
print(searchidx)
res = es.search(index=searchidx, doc_type="doc, teste-type", body={"query": {"bool": {"must":[{"match": {"response": 503}}, {"range" : {"@timestamp" : {"gte" : "now-10m", "lt" : "now"}}}]}}})
hitstotal = res['hits']['total']
print("%d documents found" % hitstotal)
if hitstotal > 0:
import smtplib
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
import json
fromaddr = "xxx@outlook.com"
toaddr = "xxx@outlook.com"
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddr
msg['Subject'] = "503 ALERT Test"
body = json.dumps(res['hits']['hits'])
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.office365.com', 587)
server.starttls()
server.login(fromaddr, "xxxxxx")
text = msg.as_string()
server.sendmail(fromaddr, toaddr, text)
server.quit()
else:
print("no hit")
2.2.设置cron job
*/10 * * * * python /app/errorlogs/alert.py
三、使用 AWS Cloudwatch
3.1.启用远程访问Elasticsearch
使用vim /etc/elasticsearch/elasticsearch.yml
, 修改 network.host 字段为network.host: 0.0.0.0
。然后,重新启动elasticsearch服务使其生效。
3.2.在服务器上安装elasticsearch-py开发包
pip install elasticsearch
创建一个脚本来收集http错误代码的次数,并将数据放到AWS Cloudwatch中。创建logs-httpcode-metrics.py文件
import time
import datetime
from datetime import date
from elasticsearch import Elasticsearch
import boto3
def getHitTotal(responseCode, searchIndicesPrefix):
today = date.today()
datestr = date.today().strftime("%Y.%m.%d")
searchidx = searchIndicesPrefix + "-" + datestr
#searchidx = searchIndicesPrefix + "-" + "2021.03.25"
searchtype = "doc, teste-type"
es = Elasticsearch([{'host': '192.168.0.100', 'port': 9200}])
countresult = es.count(index=searchidx, doc_type=searchtype,
body={"query": {"bool": {"must":[{"match": {"response": responseCode}}, {"range" : {"@timestamp" : {"gte" : "now-10m", "lt" : "now"}}}]}}},
ignore=404)
print(searchIndicesPrefix + ":")
if 'count' in countresult.keys():
hitstotal = countresult['count']
print(" %d - %d documents found" % (responseCode, hitstotal))
else:
hitstotal = 0
print(countresult)
return hitstotal
def put_metric(responseCode, searchIndicesPrefix):
cloudwatch= boto3.client('cloudwatch',
# Hard coded strings as credentials, not recommended.
aws_access_key_id='xxx', aws_secret_access_key='xxx',
region_name='ap-northeast-1'
)
metricName = 'Logs_HTTPCode_' + str(responseCode) + '_Count'
hittotal = getHitTotal(responseCode, searchIndicesPrefix)
if hittotal > 0 :
cloudwatch.put_metric_data(
MetricData=[
{
'MetricName': metricName,
'Dimensions': [
{
'Name': 'Elasticsearch Log Indices',
'Value': searchIndicesPrefix
}
],
'Timestamp': str(datetime.datetime.now()),
'Unit': 'Count',
'Value': hittotal
}],
Namespace='ELK/HTTPErrorCode'
)
return
def runTask():
listCodes = [499, 502, 503, 401, 403, 429]
listPrefixs = ['test-qa-access-logs-cw', 'test1-qa-access-logs-cw']
for currPrefix in listPrefixs:
for code in listCodes:
put_metric(code, currPrefix)
return
runTask()
3.3.设置cron job
*/10 * * * * python /home/ubuntu/workarea/tools/logs-httpcode-metrics.py
3.4.根据step 1中的自定义指标创建AWS cloudwatch告警
相关参考
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-email.html
https://tryolabs.com/blog/2015/02/17/python-elasticsearch-first-steps/
https://elasticsearch-py.readthedocs.io/en/master/
https://www.elastic.co/guide/en/x-pack/5.6/how-watcher-works.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html