Netty学习笔记-使用Netty搭建一个代理扫描服务

本文重点描述如何利用Netty实现一个代理扫描服务, 对Netty的一些基本概念只简单介绍.

1.Netty中几个比较重要的概念

1.0 Bootstrap

Bootstrap其实就是Netty服务的启动器, 服务端使用的是ServerBootstrap, 客户端使用的是Bootstrap, 我们可以通过配置Bootstrap来配置Netty使用哪种Channel, Group, Handler和Encoder, Decoder……

1.1 EventLoopGroup

reactor线程模型中处理IO事件的线程组.

1.2 Channel

这里的Channel的概念和NIO中Channel的概念是一样的, 相当于一个Socket连接.

1.3 ChannelFuture

这点在官方的Guide中也有提到, 在Netty中, 所有的处理都是异步的, 因此需要一个Future对象, 可以注册监听在异步线程处理完以后进行一些处理.

1.4 Handler

Handler其实就是事件的处理器, Netty通过Channel读入请求内容后会分配给Handler进行事件处理, Handler能够处理的事件包括:数据接收, 异常处理, 数据转换, 编码解码等问题, 其中包含两个非常重要的接口ChannelInboundHandler(Decoder实际上就是实现了这个接口), ChannelOutboundHandler(Encoder实际上就是实现了这个接口), 前者负责处理客户端发送到服务端的请求, 后者反之. 关于Handler执行顺序的一些介绍可以看一看这篇文章, handler的执行顺序.

2.利用Netty处理网络通信

2.0 定义Bootstrap

this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
    .option(ChannelOption.TCP_NODELAY, true)
    .option(ChannelOption.SO_KEEPALIVE, false)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
    .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(
                defaultEventExecutorGroup,
                new Decoder(),
                new Encoder(),
                new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                new NettyConnectManageHandler(),
                new NettyClientHandler()
            );
        }
    });

其中, eventLoopGroupWorker为处理创建channel使用的线程租, defaultEventExecutorGroup为执行ChannelHandler中的方法使用的线程租. Encoder和Encoder分别为解码器和编码器, IdleStateHandler为处理空闲线程的处理器.NettyConnectManageHandler用于管理各个连接, NettyClientHandler用于处理一下特殊的需求.
下面看源码.
Encoder不对数据做任何处理,只是将byte[]类型的数据写入ByteBuf.

public class Encoder extends MessageToByteEncoder<byte[]> {
    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
        out.writeBytes(msg);
    }
}

同样, Decoder也不对数据做任何处理, 只是将数据写入到对象数组中.

public class Decoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();
        byte[] dst = new byte[len];
        in.readBytes(dst);
        out.add(dst);
    }
}

最后在NettyClientHandler中处理数据.

class NettyClientHandler extends SimpleChannelInboundHandler<Object> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("接收响应数据");
            processMessageReceived(ctx, msg); //在processMessageReceived中处理详细的业务逻辑
        }
    }

2.1 发送数据

数据发送分为创建channel和传输数据两个部分.

public void invokeAsync(String addr, byte[] request, long timeoutMillis)
        throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            this.invokeAsyncImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

其中, getAndCreateChannel负责创建channel, invokeAsyncImpl负责发送数据.

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
  if (null == addr) {
        return null;
    }

    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}
public void invokeAsyncImpl(final Channel channel, final byte[] request, final long timeoutMillis)
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

        /**
         * 表示数据已经发送完毕, 可以记录一些其他事情,比如日志
         * @param future
         * @throws Exception
         */
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            System.out.println("finish");
            //future.channel().re
            //future.channel().read();
        }
    });
}

3.扫描http代理接口

要某段某个地址是否http代理, 只需将http请求发送到这个地址, 然后看是否得到想要的页面, 即可作出判断.
例如,某地8888端口开了http代理, 只需将一个http请求发送到这个地址, 看是否得到正确的响应即可.

3.0 组织请求数据

http请求格式本文不做讨论, http请求如下:

GET http://www.qq.com/404/search_children.js HTTP/1.1
Host: www.qq.com
Accept: */*
Pragma: no-cache
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36

把这段字符串转换成字节数组, 然后发送到本地8888端口, 代码如下:

invokeAsync("127.0.0.1:8888", REQ_HTTP, 30000)

其中REQ_HTTP就是http请求的字节数组.
请求发送后, 需要在processMessageReceived中判断接收到的响应是否符合预期.

public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    String html = new String((byte[])msg);
    if (HttpProxy.isProxy(html)) {
        System.out.println("发现http代理:" + ctx.channel().remoteAddress());
    } else {
        System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
    }
}
//验证响应中是否包"qzone.qq.com"这个字符串, 2018.3.12, 这个页面是包含下面的字符串的
private final static String ZONE = "qzone.qq.com";
public static boolean isProxy(String response) {
    if (StringUtils.isNotBlank(response) && StringUtils.contains(response, ZONE)) {
        return true;
    }
    return false;
}

4.支持多协议扩展

上文的代码示例是以http协议为示例的, 然而如果要支持多协议的扫描, 必须在channel中携带参数-协议, 只有这样, 在接收到数据以后才能根据协议去解码, 下面以pptp协议为了进行说明.
首先,在invokeAsync函数中增加一个protocol参数, 如下:

void invokeAsync(final String addr, final byte[] request, final String protocol, final long timeoutMillis)
            throws RemotingSendRequestException, RemotingConnectException, InterruptedException,RemotingTimeoutException;

然后用AttributeMap的方式,将参数写入channel,代码如下:

public static AttributeKey<String> CHANNEL_PROTOCOL = AttributeKey.valueOf("protocol");
@Override
public void invokeAsync(String addr, byte[] request, final String protocol, long timeoutMillis)
        throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
    final Channel channel = this.getAndCreateChannel(addr);
    Attribute<String> protocolAttribute = channel.attr(CHANNEL_PROTOCOL);
    protocolAttribute.setIfAbsent(protocol);
    if (channel != null && channel.isActive()) {
        try {
            this.invokeAsyncImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
//获取协议
@Override
public String getProtocol(Channel channel) {
    Attribute<String> protocol = channel.attr(CHANNEL_PROTOCOL);
    return protocol.get();
}

接着在processMessageReceived函数中根据协议来分别处理http协议和PPTP协议

public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    String html = new String((byte[])msg);
    String protocol = getProtocol(ctx.channel());
    if ("http".equals(protocol)) {
        if (HttpProxy.isProxy(html)) {
            System.out.println("发现http代理:" + ctx.channel().remoteAddress());
        } else {
            System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
        }
    } else if ("pptp".equals(protocol)) {
        if (PPTPProxy.isProxy((byte[])msg)) {
            System.out.println("发现pptp代理:" + ctx.channel().remoteAddress());
        } else {
            System.out.println("未发现pptp代理:" + ctx.channel().remoteAddress());
        }
    }
}

如此,只要在调用的时候加入协议即可.

invokeAsync(ip, REQ_HTTP, "pptp", 30000);

5.PPTP协议

PPTP(Point to Point Tunneling Protocol, VPN协议的一种), 即点对点隧道协议. 该协议是在PPP协议的基础上开发的一种新的增强型安全协议, 支持多协议虚拟专用网(VPN), 可以通过密码验证协议(PAP), 可扩展认证协议(EAP)等方法增强安全性. 可以使远程用户通过拨入ISP, 通过直接连接Internet或其他网络安全地访问企业网.
PPTP协议建立在TCP协议之上, 双方建立TCP连接之后, 只要客户端发送一个PPTP协议的握手包给服务器端, 如果服务器端返回正确的PPTP协议的响应包,那么对应就是PPTP代理.
PPTP的握手数据包如下(通过wireshark抓包获得, 16进制):

0x00, 0x9c, 0x00, 0x01, 0x1a, 0x2b, 0x3c,
        0x4d, 0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03,
        0xff, 0xff, 0x00, 0x01, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63,
        0x61, 0x6e, 0x61, 0x6e, 0x69, 0x61, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00  

PPTP的握手响应数据包如下(通过wireshark抓包获得, 16进制):

00 9c 00 01 1a 2b 3c 4d 00 02 00 00 01 00 01 00
00 00 00 00 00 00 00 00 00 01 00 01 6c 6f 63 61
6c 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 6c 69 6e 75
78 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00        

得到响应包后验证PPTP的函数如下:

public static boolean isProxy(byte[] data) {
    // match 'PPTP'
    byte b2 = data[2];
    byte b3 = data[3];
    if (b2 == 0x00 && b3 == 0x01) {
        byte b8 = data[8];
        byte b9 = data[9];
        if (b8 == 00 && b9 == 02) {
            return true;
        }
    }
    return false;
}

关于PPTP协议的数据格式,本文不做详细解释,有兴趣的自行百度~

最后, 由于本文中涉及的数据包都非常小, 在实际使用中斌哥没有遇到沾包问题, 所以本文没有涉及粘包问题, 如有粘包问题的需要, 可以利用LineBasedFrameDecoder, FixedLengthFrameDecoder等工具加以解决.

本文源代码地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...
    花丶小伟阅读 6,000评论 0 20
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 该文章为转载,原文章请点击 1. 背景 1.1. Netty 3.X系列版本现状 根据对Netty社区部分用户的调...
    Pramyness阅读 1,971评论 1 14
  • Netty是一个高性能事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建...
    我是解忧鸭铺鸭阅读 1,311评论 0 2
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,848评论 0 13