Netty学习笔记-使用Netty搭建一个代理扫描服务

本文重点描述如何利用Netty实现一个代理扫描服务, 对Netty的一些基本概念只简单介绍.

1.Netty中几个比较重要的概念

1.0 Bootstrap

Bootstrap其实就是Netty服务的启动器, 服务端使用的是ServerBootstrap, 客户端使用的是Bootstrap, 我们可以通过配置Bootstrap来配置Netty使用哪种Channel, Group, Handler和Encoder, Decoder……

1.1 EventLoopGroup

reactor线程模型中处理IO事件的线程组.

1.2 Channel

这里的Channel的概念和NIO中Channel的概念是一样的, 相当于一个Socket连接.

1.3 ChannelFuture

这点在官方的Guide中也有提到, 在Netty中, 所有的处理都是异步的, 因此需要一个Future对象, 可以注册监听在异步线程处理完以后进行一些处理.

1.4 Handler

Handler其实就是事件的处理器, Netty通过Channel读入请求内容后会分配给Handler进行事件处理, Handler能够处理的事件包括:数据接收, 异常处理, 数据转换, 编码解码等问题, 其中包含两个非常重要的接口ChannelInboundHandler(Decoder实际上就是实现了这个接口), ChannelOutboundHandler(Encoder实际上就是实现了这个接口), 前者负责处理客户端发送到服务端的请求, 后者反之. 关于Handler执行顺序的一些介绍可以看一看这篇文章, handler的执行顺序.

2.利用Netty处理网络通信

2.0 定义Bootstrap

this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
    .option(ChannelOption.TCP_NODELAY, true)
    .option(ChannelOption.SO_KEEPALIVE, false)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
    .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(
                defaultEventExecutorGroup,
                new Decoder(),
                new Encoder(),
                new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                new NettyConnectManageHandler(),
                new NettyClientHandler()
            );
        }
    });

其中, eventLoopGroupWorker为处理创建channel使用的线程租, defaultEventExecutorGroup为执行ChannelHandler中的方法使用的线程租. Encoder和Encoder分别为解码器和编码器, IdleStateHandler为处理空闲线程的处理器.NettyConnectManageHandler用于管理各个连接, NettyClientHandler用于处理一下特殊的需求.
下面看源码.
Encoder不对数据做任何处理,只是将byte[]类型的数据写入ByteBuf.

public class Encoder extends MessageToByteEncoder<byte[]> {
    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
        out.writeBytes(msg);
    }
}

同样, Decoder也不对数据做任何处理, 只是将数据写入到对象数组中.

public class Decoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();
        byte[] dst = new byte[len];
        in.readBytes(dst);
        out.add(dst);
    }
}

最后在NettyClientHandler中处理数据.

class NettyClientHandler extends SimpleChannelInboundHandler<Object> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("接收响应数据");
            processMessageReceived(ctx, msg); //在processMessageReceived中处理详细的业务逻辑
        }
    }

2.1 发送数据

数据发送分为创建channel和传输数据两个部分.

public void invokeAsync(String addr, byte[] request, long timeoutMillis)
        throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            this.invokeAsyncImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

其中, getAndCreateChannel负责创建channel, invokeAsyncImpl负责发送数据.

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
  if (null == addr) {
        return null;
    }

    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}
public void invokeAsyncImpl(final Channel channel, final byte[] request, final long timeoutMillis)
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

        /**
         * 表示数据已经发送完毕, 可以记录一些其他事情,比如日志
         * @param future
         * @throws Exception
         */
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            System.out.println("finish");
            //future.channel().re
            //future.channel().read();
        }
    });
}

3.扫描http代理接口

要某段某个地址是否http代理, 只需将http请求发送到这个地址, 然后看是否得到想要的页面, 即可作出判断.
例如,某地8888端口开了http代理, 只需将一个http请求发送到这个地址, 看是否得到正确的响应即可.

3.0 组织请求数据

http请求格式本文不做讨论, http请求如下:

GET http://www.qq.com/404/search_children.js HTTP/1.1
Host: www.qq.com
Accept: */*
Pragma: no-cache
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36

把这段字符串转换成字节数组, 然后发送到本地8888端口, 代码如下:

invokeAsync("127.0.0.1:8888", REQ_HTTP, 30000)

其中REQ_HTTP就是http请求的字节数组.
请求发送后, 需要在processMessageReceived中判断接收到的响应是否符合预期.

public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    String html = new String((byte[])msg);
    if (HttpProxy.isProxy(html)) {
        System.out.println("发现http代理:" + ctx.channel().remoteAddress());
    } else {
        System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
    }
}
//验证响应中是否包"qzone.qq.com"这个字符串, 2018.3.12, 这个页面是包含下面的字符串的
private final static String ZONE = "qzone.qq.com";
public static boolean isProxy(String response) {
    if (StringUtils.isNotBlank(response) && StringUtils.contains(response, ZONE)) {
        return true;
    }
    return false;
}

4.支持多协议扩展

上文的代码示例是以http协议为示例的, 然而如果要支持多协议的扫描, 必须在channel中携带参数-协议, 只有这样, 在接收到数据以后才能根据协议去解码, 下面以pptp协议为了进行说明.
首先,在invokeAsync函数中增加一个protocol参数, 如下:

void invokeAsync(final String addr, final byte[] request, final String protocol, final long timeoutMillis)
            throws RemotingSendRequestException, RemotingConnectException, InterruptedException,RemotingTimeoutException;

然后用AttributeMap的方式,将参数写入channel,代码如下:

public static AttributeKey<String> CHANNEL_PROTOCOL = AttributeKey.valueOf("protocol");
@Override
public void invokeAsync(String addr, byte[] request, final String protocol, long timeoutMillis)
        throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
    final Channel channel = this.getAndCreateChannel(addr);
    Attribute<String> protocolAttribute = channel.attr(CHANNEL_PROTOCOL);
    protocolAttribute.setIfAbsent(protocol);
    if (channel != null && channel.isActive()) {
        try {
            this.invokeAsyncImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
//获取协议
@Override
public String getProtocol(Channel channel) {
    Attribute<String> protocol = channel.attr(CHANNEL_PROTOCOL);
    return protocol.get();
}

接着在processMessageReceived函数中根据协议来分别处理http协议和PPTP协议

public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    String html = new String((byte[])msg);
    String protocol = getProtocol(ctx.channel());
    if ("http".equals(protocol)) {
        if (HttpProxy.isProxy(html)) {
            System.out.println("发现http代理:" + ctx.channel().remoteAddress());
        } else {
            System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
        }
    } else if ("pptp".equals(protocol)) {
        if (PPTPProxy.isProxy((byte[])msg)) {
            System.out.println("发现pptp代理:" + ctx.channel().remoteAddress());
        } else {
            System.out.println("未发现pptp代理:" + ctx.channel().remoteAddress());
        }
    }
}

如此,只要在调用的时候加入协议即可.

invokeAsync(ip, REQ_HTTP, "pptp", 30000);

5.PPTP协议

PPTP(Point to Point Tunneling Protocol, VPN协议的一种), 即点对点隧道协议. 该协议是在PPP协议的基础上开发的一种新的增强型安全协议, 支持多协议虚拟专用网(VPN), 可以通过密码验证协议(PAP), 可扩展认证协议(EAP)等方法增强安全性. 可以使远程用户通过拨入ISP, 通过直接连接Internet或其他网络安全地访问企业网.
PPTP协议建立在TCP协议之上, 双方建立TCP连接之后, 只要客户端发送一个PPTP协议的握手包给服务器端, 如果服务器端返回正确的PPTP协议的响应包,那么对应就是PPTP代理.
PPTP的握手数据包如下(通过wireshark抓包获得, 16进制):

0x00, 0x9c, 0x00, 0x01, 0x1a, 0x2b, 0x3c,
        0x4d, 0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03,
        0xff, 0xff, 0x00, 0x01, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63,
        0x61, 0x6e, 0x61, 0x6e, 0x69, 0x61, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00  

PPTP的握手响应数据包如下(通过wireshark抓包获得, 16进制):

00 9c 00 01 1a 2b 3c 4d 00 02 00 00 01 00 01 00
00 00 00 00 00 00 00 00 00 01 00 01 6c 6f 63 61
6c 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 6c 69 6e 75
78 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00        

得到响应包后验证PPTP的函数如下:

public static boolean isProxy(byte[] data) {
    // match 'PPTP'
    byte b2 = data[2];
    byte b3 = data[3];
    if (b2 == 0x00 && b3 == 0x01) {
        byte b8 = data[8];
        byte b9 = data[9];
        if (b8 == 00 && b9 == 02) {
            return true;
        }
    }
    return false;
}

关于PPTP协议的数据格式,本文不做详细解释,有兴趣的自行百度~

最后, 由于本文中涉及的数据包都非常小, 在实际使用中斌哥没有遇到沾包问题, 所以本文没有涉及粘包问题, 如有粘包问题的需要, 可以利用LineBasedFrameDecoder, FixedLengthFrameDecoder等工具加以解决.

本文源代码地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...
    花丶小伟阅读 6,134评论 0 20
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,695评论 19 139
  • 该文章为转载,原文章请点击 1. 背景 1.1. Netty 3.X系列版本现状 根据对Netty社区部分用户的调...
    Pramyness阅读 2,157评论 1 14
  • Netty是一个高性能事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建...
    我是解忧鸭铺鸭阅读 1,475评论 0 2
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 6,224评论 0 13

友情链接更多精彩内容