使用Netty5实现客户端与服务器之间的通信,采用了JBoss Marshalling外部依赖实现编解码功能,可以在客户端与服务器之间实现简单Java对象的传输。
public class Server {
private int port;
public Server(int port) { this.port = port; }
public void start() {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(boosGroup, workerGroup)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES));
ch.pipeline().addLast("encoder", MarshallingCodeCFactory2.buildMarshallingEncoder());
ch.pipeline().addLast("decoder", MarshallingCodeCFactory2.buildMarshallingDecoder());
ch.pipeline().addLast(new ServerHandler()); //ServerHandler实现了业务逻辑
.option(ChannelOption.SO_BACKLOG, 128)
// 可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = sb.bind(port).sync(); //绑定服务器,等待绑定完成,调用sync()的原因是当前线程阻塞
System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync(); //关闭channel和块,直到它被关闭
} catch (Exception e) {
boosGroup.shutdownGracefully(); //关闭EventLoopGroup,释放所有资源(包括所有创建的线程)
public static void main(String[] args) throws Exception {
int port;
if(args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
new Server(port).start();
public class Client {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS));
ch.pipeline().addLast("encoder", MarshallingCodeCFactory2.buildMarshallingEncoder());
ch.pipeline().addLast("decoder", MarshallingCodeCFactory2.buildMarshallingDecoder());
ch.pipeline().addLast(new ClientHandler());
ChannelFuture future = b.connect(host, port).sync();
} finally {
public static void main(String[] args) throws Exception {
int port = 8080;
String host = "";
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
new Client().connect(port, host);
public class ServerHandler extends ChannelHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
} else {
super.userEventTriggered(ctx, evt);
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead..");
//System.out.println(ctx.channel().remoteAddress() + " received message: " + msg.toString());
ClientMessage cmsg = (ClientMessage) msg;
System.out.println("received message: " + cmsg.getClientName() + ", " + cmsg.getClientAddress() + ", " + cmsg.getMessage());
ServerMessage smsg = new ServerMessage("server", ctx.channel().remoteAddress().toString());
//ctx.channel().writeAndFlush("server " + ctx.channel().localAddress() + " has received the message.");
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public class ClientHandler extends ChannelHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("CilentHandler ChannelActive. time: " + new Date());
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClinetHandler ChannelInactive. time: " + new Date());
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
ClientMessage cmsg = new ClientMessage("client", ctx.channel().localAddress().toString());
//ctx.channel().writeAndFlush("userEventTriggered test from client.");
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ServerMessage smsg = (ServerMessage) msg;
System.out.println("Message received: " + smsg.getServerName() + ", " + smsg.getClientAddress());
//System.out.println("Message received: " + message.toString());
MarshallingCodeCFactory2 .java
public final class MarshallingCodeCFactory2 {
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider,
1024 * 1024 * 1);
return decoder;
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling
final MarshallingConfiguration configuration = new MarshallingConfiguration();
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
ClientMessage .java
public class ClientMessage implements Serializable {
private String clientName;
private String clientAddress;
private String message = "Client to Server.";
public ClientMessage(String clientName, String clientAddress) {
this.clientName = clientName;
this.clientAddress = clientAddress;
public String getClientName() { return clientName; }
public void setClientName(String clientName) { this.clientName = clientName; }
public String getClientAddress() { return clientAddress; }
public void setClientAddress(String clientAddress) { this.clientAddress = clientAddress; }
public String getMessage() { return message; }
public String toString() {
return getClientName() + getClientAddress();
public class ServerMessage implements Serializable {
private String serverName;
private String clientAddress;
private String message = "Server to Client.";
public ServerMessage(String serverName, String clientAddress) {
this.serverName = serverName;
this.clientAddress = clientAddress;
public String getServerName() { return serverName; }
public void setServerName(String serverName) { this.serverName = serverName; }
public String getClientAddress() { return clientAddress; }
public void setClientAddress(String clientAddress) { this.clientAddress = clientAddress; }
public String getMessage() { return message; }
public String toString(){
return "{ " + this.serverName + " " + this.clientAddress + " }";