传说中的一天,在听说了某公司的内网员工由于访问了恶意网站,导致机器中毒后,领导过来问了下。
领导:能知道我们的内网用户访问是安全的么?
我的内心旁白: 作为一个资深的工程师,当然不能说不能呀。
我:当然可以啊,只要把所有用户访问过一下安全检查,就知道了呀。
领导:嗯,那就你来弄这个事情吧。能发现出来,发个微信报警,顺便存下档方便以后分析。这样就行了。
我的内心旁白:我只是吹了个牛呀,用什么数据去检测呢,每天用户访问量那么大,该怎么弄啊。
我(想了下刚才吹的牛):好的领导。这会是一个非常有意义的项目。
最近整理了一套适合2019年学习的Java\大数据资料,从基础的Java、大数据面向对象到进阶的框架知识都有整理哦,可以来我的主页免费领取哦。
架构简述
经过若干时间的思考,我打算这样做。架构简单粗暴,就是找安全组要了一个能监测恶意域名的接口,
然后把用户访问的域名都过一下这个接口。异常的信息发一份到微信,发一份到es。完美!!!
环境搭建
说干就干,首先你需要一台服务器。,然后你需要安装配置若干软件。
#cat/etc/redhat-releaseCentOSLinuxrelease7.6.1810(Core)CPU:Intel(R)Xeon(R)CPUE5-24200@1.90GHzmemory:32Gdisk:500G
packetbeat
安装,
sudoyuminstalllibpcapsudorpm-vipacketbeat-7.0.0-x86_64.rpm
主要配置
packetbeat 这里只要注意下 ,不支持同时输出到es和logstash,要是有多输出需求,起多个实例。
# 网络接口配置一把。vim /etc/packetbeat/packetbeat.ymlpacketbeat.interfaces.device: em2packetbeat.interfaces.snaplen:1514packetbeat.interfaces.type: af_packetpacketbeat.interfaces.buffer_size_mb:2048# 其它的注释掉,只留这个。- type:dns# Configure the ports where to listen for DNS traffic. You can disable# the DNS protocol by commenting out the list of ports. ports:[53]# 输出到Logstash#----------------------------- Logstash output --------------------------------output.logstash:# The Logstash hosts hosts:["localhost:5044"]
logstash
安装
rpm-ivhjdk-8u202-linux-x64.rpm#java环境变量配置一下。rpm-ivhlogstash-7.0.0.rpm
主要配置
# output 部分是对数据的筛减,后面会详解下。vim /etc/logstash/conf.d/logstash.confinput { beats { port =>5044}}output {if[client_ip]notin['10.100.2.2','10.100.2.1','10.100.2.3']{if[resource]notin['www.baidu.com.','www.baidu2.com.','www.baidu3.com.']{ kafka { codec => json topic_id =>"mytopic"} } }}
logstash 优化和监控
# 这个地方不用多,2G够用,后面的监控可以看到vim/etc/logstash/jvm.options-Xms2g-Xmx2g
# pipeline.workers 参考CPU核心数,pipeline.batch.size 可以参考监控调试一下。vim/etc/logstash/logstash.yml# pipeline.workers: 2pipeline.workers:6## How many events to retrieve from inputs before sending to filters+workers## pipeline.batch.size: 125pipeline.batch.size:2048# 新版本福利,可见即所得。# X-Pack Monitoringxpack.monitoring.enabled:truexpack.monitoring.elasticsearch.hosts: ["http://127.0.0.1:9200",]
数据筛减
先让我们来看看 这张图。可以看到logstash 每秒 input output 7k+的数据。
我心里大概想了下,要是每秒钟进账7k 人名币,美滋滋。
查看了12小时内top:5的用户ip,一看都是服务器的IP,而且访问量还这么大,就用logstash把你给排除掉吧。
查看了12小时内top:5的用户访问的域名,对于可信任的域名也给排除掉。
可以看到 经过删减后,到kafka的数据 大概在1.54K/s
kafka
kafka 是个神器,应该多多学习它。
安装
# wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz # kafka-python 支持列表kafka1.1> tar -xzf kafka_2.12-2.2.0.tgz# mv kafka_2.12-2.2.0 /usr/local/kafka
启动服务
# Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动它# bin/zookeeper-server-start.sh config/zookeeper.properties# bin/kafka-server-start.sh config/server.properties
后台运行
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-server.log2>&1&nohup bin/kafka-server-start.sh config/server.properties > kafka-server.log2>&1&
ss -ntlp 看到9092,2181 端口就表示 启动完成了。
kafka-eagle
kafka-eagle 是kfka的监控工具,推荐新手使用,图形化显示,挺好用的。
在topic-list中我们就能看到Logstash 中指定的topic
Consumers
logstash 就是我们kafka的 producer,
我们用Python来做consumer. 推荐使用下面的方式 安装kafka-python,
git clone https://github.com/dpkp/kafka-pythonpip install ./kafka-python
这个地方算是个难点, python的线程池了解一下。
解释下,在这个案例中 瓶颈在 网络I/O时间,多线程是比较合适的。
如果是密集计算任务,多进程是比较合适的。
python脚本会持续优化更新,有兴趣的小伙伴加QQ群 一起讨论:752774493
\#!/usr/bin/env python# -*- coding: utf-8 -*-importjsonimportloggingimportrequestsfromdatetimeimportdatetimefromkafkaimportKafkaConsumerfromelasticsearchimportElasticsearchfromconcurrent.futuresimportThreadPoolExecutorfromwechatimportsendwechatlogging.basicConfig(level=logging.INFO, filename='/var/log/security-check/dns.log')logging.basicConfig(level=logging.INFO)logger = logging.getLogger('dns')defcheck_dns(message):value = json.loads(message.value.decode('utf-8')) resource = value['resource'][:-1] client_ip = value['client_ip'] now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") url ="http://127.0.0.1:8080/dns"querystring = {"token":"TOKEN","param": resource}try: response = requests.request("GET", url, params=querystring)ifresponse.json()['code'] ==0: body = response.json()['result'][0]['data'][0] body['@timestamp'] = now body['client_ip'] = client_ip index ='security_check_url-{}'.format(datetime.now().strftime("%Y.%m.%d")) es = Elasticsearch("http://127.0.0.1", http_auth=('admin','admin')) result = es.index(index=index, doc_type="doc", body=body) sendwechat(jobnumber, client_ip, resource) logger.info(result)exceptExceptionase: logger.error('this is a error log {}'.format(e))defsecurity_check_dns():executor = ThreadPoolExecutor(max_workers=20)try: consumer = KafkaConsumer('mytopic', bootstrap_servers=['127.0.0.1:9092'], group_id='MY_GROUP1', consumer_timeout_ms=1000)formessageinconsumer: executor.submit(check_dns, message)exceptExceptionase: logger.error('this is a error log {}'.format(e))if__name__ =="__main__": security_check_dns()
查看Consumers
kafka 真是个好东西。谁用谁知道。
查看效果
你要是发现某个IP一直在报,那就要主动去了解下。查杀下电脑了。
查看比较详细的信息,kibana 当仁不让啊。