大数据之恶意域名监测

传说中的一天,在听说了某公司的内网员工由于访问了恶意网站,导致机器中毒后,领导过来问了下。

领导:能知道我们的内网用户访问是安全的么?

我的内心旁白: 作为一个资深的工程师,当然不能说不能呀。

我:当然可以啊,只要把所有用户访问过一下安全检查,就知道了呀。

领导:嗯,那就你来弄这个事情吧。能发现出来,发个微信报警,顺便存下档方便以后分析。这样就行了。

我的内心旁白:我只是吹了个牛呀,用什么数据去检测呢,每天用户访问量那么大,该怎么弄啊。

我(想了下刚才吹的牛):好的领导。这会是一个非常有意义的项目。

最近整理了一套适合2019年学习的Java\大数据资料,从基础的Java、大数据面向对象到进阶的框架知识都有整理哦,可以来我的主页免费领取哦。

架构简述

经过若干时间的思考,我打算这样做。架构简单粗暴,就是找安全组要了一个能监测恶意域名的接口,

然后把用户访问的域名都过一下这个接口。异常的信息发一份到微信,发一份到es。完美!!!

环境搭建

说干就干,首先你需要一台服务器。,然后你需要安装配置若干软件。

#cat/etc/redhat-releaseCentOSLinuxrelease7.6.1810(Core)CPU:Intel(R)Xeon(R)CPUE5-24200@1.90GHzmemory:32Gdisk:500G

packetbeat

安装,

sudoyuminstalllibpcapsudorpm-vipacketbeat-7.0.0-x86_64.rpm

主要配置

packetbeat 这里只要注意下 ,不支持同时输出到es和logstash,要是有多输出需求,起多个实例。

# 网络接口配置一把。vim /etc/packetbeat/packetbeat.ymlpacketbeat.interfaces.device: em2packetbeat.interfaces.snaplen:1514packetbeat.interfaces.type: af_packetpacketbeat.interfaces.buffer_size_mb:2048# 其它的注释掉,只留这个。- type:dns# Configure the ports where to listen for DNS traffic. You can disable# the DNS protocol by commenting out the list of ports.    ports:[53]# 输出到Logstash#----------------------------- Logstash output --------------------------------output.logstash:# The Logstash hosts    hosts:["localhost:5044"]

logstash

安装

rpm-ivhjdk-8u202-linux-x64.rpm#java环境变量配置一下。rpm-ivhlogstash-7.0.0.rpm

主要配置

# output 部分是对数据的筛减,后面会详解下。vim /etc/logstash/conf.d/logstash.confinput {    beats {    port =>5044}}output {if[client_ip]notin['10.100.2.2','10.100.2.1','10.100.2.3']{if[resource]notin['www.baidu.com.','www.baidu2.com.','www.baidu3.com.']{        kafka {            codec => json            topic_id =>"mytopic"}        }    }}

logstash 优化和监控

# 这个地方不用多,2G够用,后面的监控可以看到vim/etc/logstash/jvm.options-Xms2g-Xmx2g

# pipeline.workers 参考CPU核心数,pipeline.batch.size 可以参考监控调试一下。vim/etc/logstash/logstash.yml# pipeline.workers: 2pipeline.workers:6## How many events to retrieve from inputs before sending to filters+workers## pipeline.batch.size: 125pipeline.batch.size:2048# 新版本福利,可见即所得。# X-Pack Monitoringxpack.monitoring.enabled:truexpack.monitoring.elasticsearch.hosts: ["http://127.0.0.1:9200",]

数据筛减

先让我们来看看  这张图。可以看到logstash 每秒 input output 7k+的数据。

我心里大概想了下,要是每秒钟进账7k 人名币,美滋滋。

查看了12小时内top:5的用户ip,一看都是服务器的IP,而且访问量还这么大,就用logstash把你给排除掉吧。

查看了12小时内top:5的用户访问的域名,对于可信任的域名也给排除掉。

可以看到 经过删减后,到kafka的数据 大概在1.54K/s

kafka

kafka 是个神器,应该多多学习它。

安装

# wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz # kafka-python 支持列表kafka1.1> tar -xzf kafka_2.12-2.2.0.tgz# mv kafka_2.12-2.2.0 /usr/local/kafka

启动服务

# Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动它# bin/zookeeper-server-start.sh config/zookeeper.properties# bin/kafka-server-start.sh config/server.properties

后台运行

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-server.log2>&1&nohup bin/kafka-server-start.sh config/server.properties > kafka-server.log2>&1&

ss -ntlp  看到9092,2181 端口就表示 启动完成了。

kafka-eagle

kafka-eagle 是kfka的监控工具,推荐新手使用,图形化显示,挺好用的。

在topic-list中我们就能看到Logstash 中指定的topic

Consumers

logstash 就是我们kafka的 producer,

我们用Python来做consumer.  推荐使用下面的方式 安装kafka-python,

git clone https://github.com/dpkp/kafka-pythonpip install ./kafka-python

这个地方算是个难点, python的线程池了解一下。

解释下,在这个案例中 瓶颈在 网络I/O时间,多线程是比较合适的。

如果是密集计算任务,多进程是比较合适的。

python脚本会持续优化更新,有兴趣的小伙伴加QQ群 一起讨论:752774493

\#!/usr/bin/env python# -*- coding: utf-8 -*-importjsonimportloggingimportrequestsfromdatetimeimportdatetimefromkafkaimportKafkaConsumerfromelasticsearchimportElasticsearchfromconcurrent.futuresimportThreadPoolExecutorfromwechatimportsendwechatlogging.basicConfig(level=logging.INFO, filename='/var/log/security-check/dns.log')logging.basicConfig(level=logging.INFO)logger = logging.getLogger('dns')defcheck_dns(message):value = json.loads(message.value.decode('utf-8'))    resource = value['resource'][:-1]    client_ip = value['client_ip']    now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")    url ="http://127.0.0.1:8080/dns"querystring = {"token":"TOKEN","param": resource}try:        response = requests.request("GET", url, params=querystring)ifresponse.json()['code'] ==0:            body = response.json()['result'][0]['data'][0]            body['@timestamp'] = now            body['client_ip'] = client_ip            index ='security_check_url-{}'.format(datetime.now().strftime("%Y.%m.%d"))            es = Elasticsearch("http://127.0.0.1", http_auth=('admin','admin'))            result = es.index(index=index, doc_type="doc", body=body)            sendwechat(jobnumber, client_ip, resource)            logger.info(result)exceptExceptionase:        logger.error('this is a error log {}'.format(e))defsecurity_check_dns():executor = ThreadPoolExecutor(max_workers=20)try:        consumer = KafkaConsumer('mytopic',                                bootstrap_servers=['127.0.0.1:9092'],                                group_id='MY_GROUP1',                                consumer_timeout_ms=1000)formessageinconsumer:        executor.submit(check_dns, message)exceptExceptionase:        logger.error('this is a error log {}'.format(e))if__name__ =="__main__":    security_check_dns()

查看Consumers

kafka 真是个好东西。谁用谁知道。

查看效果

你要是发现某个IP一直在报,那就要主动去了解下。查杀下电脑了。

查看比较详细的信息,kibana 当仁不让啊。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容