本文重点描述如何利用Netty实现一个代理扫描服务, 对Netty的一些基本概念只简单介绍.
1.Netty中几个比较重要的概念
1.0 Bootstrap
Bootstrap其实就是Netty服务的启动器, 服务端使用的是ServerBootstrap, 客户端使用的是Bootstrap, 我们可以通过配置Bootstrap来配置Netty使用哪种Channel, Group, Handler和Encoder, Decoder……
1.1 EventLoopGroup
reactor线程模型中处理IO事件的线程组.
1.2 Channel
这里的Channel的概念和NIO中Channel的概念是一样的, 相当于一个Socket连接.
1.3 ChannelFuture
这点在官方的Guide中也有提到, 在Netty中, 所有的处理都是异步的, 因此需要一个Future对象, 可以注册监听在异步线程处理完以后进行一些处理.
1.4 Handler
Handler其实就是事件的处理器, Netty通过Channel读入请求内容后会分配给Handler进行事件处理, Handler能够处理的事件包括:数据接收, 异常处理, 数据转换, 编码解码等问题, 其中包含两个非常重要的接口ChannelInboundHandler(Decoder实际上就是实现了这个接口), ChannelOutboundHandler(Encoder实际上就是实现了这个接口), 前者负责处理客户端发送到服务端的请求, 后者反之. 关于Handler执行顺序的一些介绍可以看一看这篇文章, handler的执行顺序.
2.利用Netty处理网络通信
2.0 定义Bootstrap
this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new Decoder(),
new Encoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler()
);
}
});
其中, eventLoopGroupWorker为处理创建channel使用的线程租, defaultEventExecutorGroup为执行ChannelHandler中的方法使用的线程租. Encoder和Encoder分别为解码器和编码器, IdleStateHandler为处理空闲线程的处理器.NettyConnectManageHandler用于管理各个连接, NettyClientHandler用于处理一下特殊的需求.
下面看源码.
Encoder不对数据做任何处理,只是将byte[]类型的数据写入ByteBuf.
public class Encoder extends MessageToByteEncoder<byte[]> {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
out.writeBytes(msg);
}
}
同样, Decoder也不对数据做任何处理, 只是将数据写入到对象数组中.
public class Decoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int len = in.readableBytes();
byte[] dst = new byte[len];
in.readBytes(dst);
out.add(dst);
}
}
最后在NettyClientHandler中处理数据.
class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收响应数据");
processMessageReceived(ctx, msg); //在processMessageReceived中处理详细的业务逻辑
}
}
2.1 发送数据
数据发送分为创建channel和传输数据两个部分.
public void invokeAsync(String addr, byte[] request, long timeoutMillis)
throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
this.invokeAsyncImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
其中, getAndCreateChannel负责创建channel, invokeAsyncImpl负责发送数据.
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) {
return null;
}
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
return this.createChannel(addr);
}
public void invokeAsyncImpl(final Channel channel, final byte[] request, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
/**
* 表示数据已经发送完毕, 可以记录一些其他事情,比如日志
* @param future
* @throws Exception
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("finish");
//future.channel().re
//future.channel().read();
}
});
}
3.扫描http代理接口
要某段某个地址是否http代理, 只需将http请求发送到这个地址, 然后看是否得到想要的页面, 即可作出判断.
例如,某地8888端口开了http代理, 只需将一个http请求发送到这个地址, 看是否得到正确的响应即可.
3.0 组织请求数据
http请求格式本文不做讨论, http请求如下:
GET http://www.qq.com/404/search_children.js HTTP/1.1
Host: www.qq.com
Accept: */*
Pragma: no-cache
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36
把这段字符串转换成字节数组, 然后发送到本地8888端口, 代码如下:
invokeAsync("127.0.0.1:8888", REQ_HTTP, 30000)
其中REQ_HTTP就是http请求的字节数组.
请求发送后, 需要在processMessageReceived中判断接收到的响应是否符合预期.
public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
String html = new String((byte[])msg);
if (HttpProxy.isProxy(html)) {
System.out.println("发现http代理:" + ctx.channel().remoteAddress());
} else {
System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
}
}
//验证响应中是否包"qzone.qq.com"这个字符串, 2018.3.12, 这个页面是包含下面的字符串的
private final static String ZONE = "qzone.qq.com";
public static boolean isProxy(String response) {
if (StringUtils.isNotBlank(response) && StringUtils.contains(response, ZONE)) {
return true;
}
return false;
}
4.支持多协议扩展
上文的代码示例是以http协议为示例的, 然而如果要支持多协议的扫描, 必须在channel中携带参数-协议, 只有这样, 在接收到数据以后才能根据协议去解码, 下面以pptp协议为了进行说明.
首先,在invokeAsync函数中增加一个protocol参数, 如下:
void invokeAsync(final String addr, final byte[] request, final String protocol, final long timeoutMillis)
throws RemotingSendRequestException, RemotingConnectException, InterruptedException,RemotingTimeoutException;
然后用AttributeMap的方式,将参数写入channel,代码如下:
public static AttributeKey<String> CHANNEL_PROTOCOL = AttributeKey.valueOf("protocol");
@Override
public void invokeAsync(String addr, byte[] request, final String protocol, long timeoutMillis)
throws RemotingSendRequestException, RemotingConnectException, InterruptedException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr);
Attribute<String> protocolAttribute = channel.attr(CHANNEL_PROTOCOL);
protocolAttribute.setIfAbsent(protocol);
if (channel != null && channel.isActive()) {
try {
this.invokeAsyncImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
//获取协议
@Override
public String getProtocol(Channel channel) {
Attribute<String> protocol = channel.attr(CHANNEL_PROTOCOL);
return protocol.get();
}
接着在processMessageReceived函数中根据协议来分别处理http协议和PPTP协议
public void processMessageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
String html = new String((byte[])msg);
String protocol = getProtocol(ctx.channel());
if ("http".equals(protocol)) {
if (HttpProxy.isProxy(html)) {
System.out.println("发现http代理:" + ctx.channel().remoteAddress());
} else {
System.out.println("未发现http代理:" + ctx.channel().remoteAddress());
}
} else if ("pptp".equals(protocol)) {
if (PPTPProxy.isProxy((byte[])msg)) {
System.out.println("发现pptp代理:" + ctx.channel().remoteAddress());
} else {
System.out.println("未发现pptp代理:" + ctx.channel().remoteAddress());
}
}
}
如此,只要在调用的时候加入协议即可.
invokeAsync(ip, REQ_HTTP, "pptp", 30000);
5.PPTP协议
PPTP(Point to Point Tunneling Protocol, VPN协议的一种), 即点对点隧道协议. 该协议是在PPP协议的基础上开发的一种新的增强型安全协议, 支持多协议虚拟专用网(VPN), 可以通过密码验证协议(PAP), 可扩展认证协议(EAP)等方法增强安全性. 可以使远程用户通过拨入ISP, 通过直接连接Internet或其他网络安全地访问企业网.
PPTP协议建立在TCP协议之上, 双方建立TCP连接之后, 只要客户端发送一个PPTP协议的握手包给服务器端, 如果服务器端返回正确的PPTP协议的响应包,那么对应就是PPTP代理.
PPTP的握手数据包如下(通过wireshark抓包获得, 16进制):
0x00, 0x9c, 0x00, 0x01, 0x1a, 0x2b, 0x3c,
0x4d, 0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03,
0xff, 0xff, 0x00, 0x01, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63,
0x61, 0x6e, 0x61, 0x6e, 0x69, 0x61, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
PPTP的握手响应数据包如下(通过wireshark抓包获得, 16进制):
00 9c 00 01 1a 2b 3c 4d 00 02 00 00 01 00 01 00
00 00 00 00 00 00 00 00 00 01 00 01 6c 6f 63 61
6c 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 6c 69 6e 75
78 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00
得到响应包后验证PPTP的函数如下:
public static boolean isProxy(byte[] data) {
// match 'PPTP'
byte b2 = data[2];
byte b3 = data[3];
if (b2 == 0x00 && b3 == 0x01) {
byte b8 = data[8];
byte b9 = data[9];
if (b8 == 00 && b9 == 02) {
return true;
}
}
return false;
}
关于PPTP协议的数据格式,本文不做详细解释,有兴趣的自行百度~
最后, 由于本文中涉及的数据包都非常小, 在实际使用中斌哥没有遇到沾包问题, 所以本文没有涉及粘包问题, 如有粘包问题的需要, 可以利用LineBasedFrameDecoder, FixedLengthFrameDecoder等工具加以解决.