nginx+lua+kafka实现访问ip实时上传并消费存库分析一

最近自己的博客系统发现经常有有一些ip攻击,所以,想做一个实现动态封禁攻击ip的功能,最初想的是使用redis实现,目前刚好在学习kafka,所以,本人使用了kafka实时发送访问的ip到后台,然后后台入库处理,并做归属地查询,然后分析出此ip是否存在攻击功能,使用脚本,将攻击者的ip动态封禁,实现网站保护的一个基本功能,由于是自己的网站使用,没有做太多的优化,今天,分享下第一个步骤,将访问ip入库话不多说

我们一起来看下,具体怎么操作的,首先,安装openstry,这个是支持lua脚本语言的nginx.大家可以上网搜索相关资料,进行安装,这个不是今天的重点,这里就不在叙述openstry的安装了,我们从安装kafka开始,

一:安装zookeeper

因为安装kafka需要使用zookeeper,当然,kafka内部自带zookeeper,小伙伴们也可以使用自带的zoookeeper,我今天演示的案例使用的是外部的zookeeper,

首先,在linux下载zookeeper安装包,执行解压,然后执行配置文件的配置,启动,就可以了,过程比较简单,我们大概看下就行

我的安装包是zookeeper-3.4.14,安装完成后,启动可以,启动命令,./zkServer.sh start

我的已经启动了,我们使用命令./zkServer.是status,可以看到如下信息的话,就证明已经启动成功了,我的是单机版,大家注意

二:安装kafka

    kafka的官网是:[http://kafka.apache.org/](http://kafka.apache.org/),下载最新的版本就可以,用wget在线安装

    我的安装版本是kafka_2.12-2.5.0.tgz,

    执行解压,解压命令:tar -zxvf kafka_2.12-2.5.0.tgz

    进入kafka的配置文件目录,执行配置文件的配置,配置文件名称是server.properties
主要配置下listeners,advertised.listeners,zookeeper,其他都按照默认配置即可,以下是我的配置文件配置,我删除了一部分注释信息
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://你自己的ip地址:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://你自己的ip地址:9092
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/software/kafka/logs/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

接下来就是启动了,一定记得,先启动zk,再启动kafka,

启动命令,当然你也可以使用nohup执行后台启动,

后台启动命令

启动成功后,执行jps命令,会看到kafka的进程,说明启动成功了,如果启动失败,一般会报错内存不够的话,需要改一下移动的内存,kafka默认的都是1g,我们改成512M就可以,

修改内存大小,将Xmx和Xms由原来的的1G修改为512M,注意这里不要修改的太小,否则kafka启动会很慢,
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi

修改完内存,然后在执行启动,没啥别的问题,就启动成功了,这个时候,我们就可以创建主题了,以下是我的创建以及运行测试命令,如果已经创建过,执行创建命令的时候,会提示已经存在,创建成功的话,会提示created 表示已经创建成功,然后可以执行启动第二条命令,执行生产者,再打开一个窗口,执行第三条命令

启动消费者,这个时候,你就可以测试发送消息,然后另一个窗口就会消费到消息,比较简单,我这里就不做演示了

创建主题:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
启动生产者:
./kafka-console-producer.sh --broker-list 自己的ip:9092 --topic test
启动消费者:
./kafka-console-consumer.sh --bootstrap-server 自己的ip:9092 --topic test --from-beginning

三:springboot集成kafka,

    我们的目的是kafka发送消息,然后后台接受消息,拿到ip后,执行归属地查询,入库操作,

所以,先看看如何集成到spring项目中

引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.4.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>

配置kafka

spring.kafka.bootstrap-servers=你的ip:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=123
spring.kafka.producer.buffer-memory=1234567889
#生产的编码解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定默认消费者group id
spring.kafka.consumer.group-id=blog_app

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true

# 消费者的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

这样,就基本配置完成了,现在就是写生产者发送消息,消费者消费消息,使用spring提供的Kafkaemplete就可以操作了,

我们今天的任务是,实时采集nginx日志的访问ip,然后入库,我们需要写一个消费者类,我的代码如下,获取消息是模板代码,在注解Kafkalisteners中定义你自己的topic,然后获取转换,就可以,代码比较加单,

@Component
public class KafkaReceiver {

    private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);

    private Gson gson = new GsonBuilder().create();

    @Autowired
    private NingxRequestLogService ningxRequestLogService;

    @Autowired
    private RedisTemplate redisTemplate;

    @KafkaListener(topics = {"access-log"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            String nginxLog = String.valueOf(kafkaMessage.get());
            logger.info("------------------ 当前访问的信息为 =" + nginxLog);
            try {
                NginxLogParam nginxLogParam = gson.fromJson(nginxLog, NginxLogParam.class);
                if (null != nginxLogParam && StringUtils.isNoneBlank(nginxLogParam.getClientIP())){

                    //归属地查询
                    IpAddr ipAddr = checkAttribution(nginxLogParam);
                    NingxRequestLog ningxRequestLog = new NingxRequestLog();
                    ningxRequestLog.setRequestIp(nginxLogParam.getClientIP());
                    ningxRequestLog.setRequestMethod(nginxLogParam.getMethod());
                    ningxRequestLog.setRequestHttpVersion("1.1");
                    ningxRequestLog.setCreateTime(new Date());
                    ningxRequestLog.setRequestCity(ipAddr.getCity());
                    ningxRequestLog.setRequestProvince(ipAddr.getProvince());

                    this.ningxRequestLogService.add(ningxRequestLog);
                    logger.info("访问ip[{}]入库successed:",ningxRequestLog.getRequestIp());
                }else{
                    logger.info("当前访问的信息未成功获取到ip");
                }
            } catch (Exception e) {
                logger.error("nginx 访问 ip 入库错误,原因为[{}]",e.getMessage());
            }

        }

    }

    private IpAddr checkAttribution(NginxLogParam nginxLogParam) {
        String key = RedisCommonUtils.REQUEST_IP_PREIFIX + nginxLogParam.getClientIP();
        Boolean exitIp = this.redisTemplate.hasKey(key);
        IpAddr ipAddr = new IpAddr();
        if (exitIp){
            ipAddr = (IpAddr)this.redisTemplate.opsForValue().get(key);
            logger.info("当前访问的ip:[{}]在缓存中存在归属地信息,从缓存中返回成功",nginxLogParam.getClientIP());
            return ipAddr;
        }

        //归属地获取
        HashMap<String, String> regionParamMap = new HashMap<>();
        regionParamMap.put("lang","zh-CN");
        String s = null;
        try {
            s = HttpUtil.sendGet(CommonUrls.XINLANG_REGIN_URL+nginxLogParam.getClientIP(), regionParamMap);
        } catch (UnsupportedEncodingException e) {
            logger.error("归属地查询失败,原因为[{}]",e);
            ipAddr.setProvince(StringUtils.EMPTY);
            ipAddr.setCity(StringUtils.EMPTY);

        }
        IpRegion ipRegion = JSON.parseObject(s, IpRegion.class);
        ipAddr.setProvince(StringUtils.isBlank(ipRegion.getRegionName()) ? StringUtils.EMPTY : ipRegion.getRegionName());
        ipAddr.setCity(ipRegion.getCity());
        this.redisTemplate.opsForValue().set(key,ipAddr,24*60,TimeUnit.MINUTES);

        return ipAddr;

    }

    private static class IpAddr {

        public IpAddr() {}

        private String city;
        private String province;

        public String getCity() {
            return city;
        }

        public void setCity(String city) {
            this.city = city;
        }

        public String getProvince() {
            return province;
        }

        public void setProvince(String province) {
            this.province = province;
        }
    }
}

四:使用lua脚本实时发送nginx访问日志到kafka,

    接下来就是如何将nginx消息发送出去的问题,我们采用kafka的lua脚本执行发送,由于kafka已经存在lua标准库,所以我们得下载kafka支持的lua脚本让入到

openstry文件下,然后编写lua脚本执行就可以,一起来看看吧

第一步:下载kafka的lua脚本并加压后复制到指定的文件夹下 ,注意自己的路径就可以 
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

yum install -y unzip

unzip lua-resty-kafka-master.zip

cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib
第二步:编写kafka脚本
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by renxiaole.
--- DateTime: 2020/8/1 09:48
---
local cjson = require("cjson")
local producer = require("resty.kafka.producer")

local broker_list = {
    { host = "你自己的ip", port = 9092 }
}
--定义一个本地变量
local log_json = {}
--获取headers
local headers=ngx.req.get_headers()
--获取ip
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
log_json["ip"]=ip
--对发送的消息执行编码
local message = cjson.encode(log_json);
local productId = ngx.req.get_uri_args()["productId"]
--创建producer对象
local async_producer = producer:new(broker_list, { producer_type = "async" })
--执行发送
local ok, err = async_producer:send("access-log", productId, message)

if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
end
第三部:部署到nginx的配置文件中
access_by_lua_file /usr/local/software/openresty/openresty-1.13.6.1/nginx/conf/lua/kafka_log.lua;
第四步:重新启动nginx

然后大功告成,接下来,就是提交代码,执行部署项目,就可以看到数据正常入库了,我们去数据库看看,有没有数据,我们看到已经正常入库!

459 203.208.60.71   GET 1.1 2020-08-01 19:19:46 北京市 海淀
460 183.136.225.56  GET 1.1 2020-08-01 20:05:07 浙江省 诸暨
461 47.100.64.86    GET 1.1 2020-08-01 20:07:45 浙江省 西湖
462 150.137.27.107  GET 1.1 2020-08-01 20:49:27 夏威夷州    檀香山 
464 7.62.153.81 GET 1.1 2020-08-01 20:49:33 俄亥俄州    Columbus
465 47.101.198.15   GET 1.1 2020-08-01 21:18:06 浙江省 西湖
466 47.101.198.126  GET 1.1 2020-08-01 21:26:50 浙江省 西湖

本期分享了通过kafka+lua实时获取网站ip,其实方法很多,下一期,我们一起来分享如何计算这些ip的访问评率,然后动态封禁他们,本期就到这里,本人水平有限,如果有不妥之处,请留言,我会虚心接受意见和建议,谢谢,
大家也可以访问我的网站任小乐技术博客,目前主要会在本人的技术网站发布最新的文章,谢谢!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355