一、概念&原理
二、Netty Client
创建并连接到netty-server端
public Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) throws SSLException {
// Configure SSL.
InetAddress localInetHost;
try {
localInetHost = InetAddress.getLocalHost();
localHost = localInetHost.getHostAddress();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
if (bootstrap != null) {
final LoginAuthClientHandler handler1 = new LoginAuthClientHandler(this);
final HeartBeatReqHandler handler2 = new HeartBeatReqHandler(this);
final ObjectEchoClientHandler handler3 = new ObjectEchoClientHandler(this);
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), serverHost, serverPort));
}
/*channelPipeline中依次加入编码器、解码器和各handler形成一条顺序处理链*/
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(10240 * 10240, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())),
handler1, handler2,handler3);
}
});
bootstrap.localAddress(localHost, localPort);
bootstrap.connect(serverHost, serverPort).addListener(new ConnectionListener(this));
}
return bootstrap;
}
连接监听器:
public class ConnectionListener implements ChannelFutureListener {
private ObjectEchoClient client;
public ConnectionListener(ObjectEchoClient client) {
this.client = client;
}
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
LogConstant.runLog.info("Reconnect");
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
try {
client.createBootstrap(new Bootstrap(), loop);
} catch (SSLException e) {
e.printStackTrace();
}
}
}, 1L, TimeUnit.SECONDS);
}
}
}
监听器负责监听连接是否成功,如不成功会自动重连。
Netty-Client端Handler,举例:
/*channel可用*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
LogConstant.runLog.info("LoginAuthReqHandler is active!!! ");
NettyMessage nettyMessage = buildAuthMessage();
//写入队列并刷新
ctx.writeAndFlush(nettyMessage);
}
/*channel不可用,自动重连*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
try {
client.createBootstrap(new Bootstrap(), eventLoop);
} catch (SSLException e) {
e.printStackTrace();
}
}
}, 1L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LogConstant.runLog.info("Received msg from server:" + msg.toString());
if (msg instanceof NettyMessage) {
NettyMessage nettyMessage = (NettyMessage) msg;
if (nettyMessage.getHeader() == null || nettyMessage.getHeader().getType() == null) {
return;
}
if (nettyMessage.getHeader().getType().equals(MessageType.LOGIN_RESP)) {
//如果收到正确的server端返回的登录鉴权指令,则进入下一个心跳handler
LogConstant.runLog.info("Auth message:[" + nettyMessage + "]");
LogConstant.runLog.info("Enter into HeartBeatReqHandler handler!");
//对解析出的对象执行fireChannelRead方法,保证Pipeline的往下传递
ctx.fireChannelRead(msg);
} else if (nettyMessage.getHeader().getType().equals(MessageType.NORMAL)) {
//如果是普通指令,则进入到下一个handler中
LogConstant.runLog.info("enter into ObjectEchoClientHandler, message:[" + msg.toString() + "]");
ctx.fireChannelRead(msg);
} else if (nettyMessage.getHeader().getType().equals(MessageType.HEART_BEAT_RESP)) {
LogConstant.runLog.info("enter into HeartBeatReqHandler, message:[" + msg.toString() + "]");
ctx.fireChannelRead(msg);
}
else {
//如果是非法指令,直接关闭
LogConstant.runLog.error("message illegal! closed!" + msg.toString());
ctx.close();
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
三、Netty Server
Netty Server配置类:
@Configuration
public class NettyConfig {
@Value("${netty.boss.thread.count:2}")
private int bossCount;
@Value("${netty.worker.thread.count:2}")
private int workerCount;
@Value("${netty.tcp.port:8090}")
private int tcpPort;
@Value("${netty.so.keepalive:true}")
private boolean keepAlive;
@Value("${netty.so.backlog:100}")
private int backlog;
@Autowired
@Qualifier("objectChannelInitializer")
private ObjectChannelInitializer objectChannelInitializer;
@SuppressWarnings("unchecked")
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(objectChannelInitializer);
Map, Object> tcpChannelOptions = tcpChannelOptions();
Set> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
@Bean(name = "tcpChannelOptions")
public Map, Object> tcpChannelOptions() {
Map, Object> options = new HashMap, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "stringEncoder")
public StringEncoder stringEncoder() {
return new StringEncoder();
}
@Bean(name = "stringDecoder")
public StringDecoder stringDecoder() {
return new StringDecoder();
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
Netty Server初始化类:
@Component
@Qualifier("objectChannelInitializer")
public class ObjectChannelInitializer extends ChannelInitializer {
@Autowired
LoginAuthServerHandler loginAuthServerHandler;
@Autowired
HeartBeatRespHandler heartBeatRespHandler;
@Autowired
ObjectEchoServerHandler objectEchoServerHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ObjectEncoder(),
new ObjectDecoder(10240 * 10240,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())),
loginAuthServerHandler, heartBeatRespHandler, objectEchoServerHandler);
}
}
将各个Handler注入并加入处理链,按加入的先后顺序调用。
各Handler实现:
@Component
@Qualifier("heartBeatRespHandler")
@ChannelHandler.Sharable
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
private volatile ScheduledFuture heartBeat;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NettyMessage message = (NettyMessage) msg;
//具体业务逻辑,此处省略
//wirteAndFlush代表将该信息写入channel
ctx.writeAndFlush(msg);
//fireChannelRead代表将该信息传递到下一个责任链节点处理
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}