上篇文章讲解了客户端与服务端通信示例,本篇来讲解下多客户端之间是如何通信的,我们以一个聊天室的程序为例。
具体需求:
客户端1、2、3(通过remoteAddress来标识),当客户端1上线后,发送一条消息给服务端,当客户端2上线后,通知客户端1:“客户端2已经上线”,当客户端3上线后,通知客户端1和客户端2:“客户端3已经上线”。
按照Netty服务构建步骤进行,可以参见Netty构建服务的基本步骤文章来了解构建过程以及具体说明。
- 首先构建聊天室服务端入口,多个客户端通信都是经由服务端作为传输和通信载体。
示例代码:
public class MyCatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChatServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 创建MyChatClientInitializer,添加编解码处理器ChannelPipeline.
示例代码:
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
// 添加基于\r \n界定符的解码器
channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加字符串解码器
channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加字符串编码器
channelPipeline.addLast(new MyChatServerHandler()); // 自定义处理器
}
}
- 创建自定义处理器MyChatServerHandler, 这里涉及到一个重要的组件ChannelGroup,它是线程安全的,ChannelGroup存储了已连接的Channel,Channel关闭会自动从ChannelGroup中移除,无需担心Channel生命周期。同时,可以对这些Channel做各种批量操作,可以以广播的形式发送一条消息给所有的Channels,调用它的writeAndFlush方法来实现。
ChannelGroup可以进一步理解为设计模式中的发布-订阅模型,其底层是通过ConcurrentHashMap进行存储所有Channel的。
示例代码:
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 这里要区分下是否是自己发的消息
Channel channel = ctx.channel();
// 这里使用了Java8的lambda表达式
channelGroup.forEach(ch -> {
if (ch == channel) { // 两个channel对象地址相同
System.out.println("服务器端转发聊天消息:【自己】发送的消息, 内容:" + msg + "\n");
ch.writeAndFlush("【自己】发送的消息, 内容:" + msg + "\n");
} else {
System.out.println("服务器端转发聊天消息:"+ ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
ch.writeAndFlush(ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
}
});
}
// -----------以下覆写的方法是ChannelInboundHandlerAdapter中的方法---------------
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 加入了\n");
// 先写入到客户端,最后再将自己添加到ChannelGroup中
channelGroup.add(channel);
}
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 离开了\n");
// 这里channelGroup会自动进行调用,所以这行代码不写也是可以的。
channelGroup.remove(channel);
}
/**
* 只要有客户端连接就会执行
*
* @param ctx
* @throws Exception
*/
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了\n");
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 下线了\n");
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- 构建客户端入口,连接服务端8899端口,示例中通过控制台输入形式给服务端发送消息。
示例代码:
public class MyCatClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitalizer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
// channelFuture.channel().closeFuture().sync();
// 从控制台不断的读取输入
boolean running = true;
try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
while (running) {
channelFuture.channel().writeAndFlush(br.readLine() + "\r\n");
}
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
- 创建客户端MyChatClientInitializer,跟服务端基本类似的处理器。
示例代码:
public class MyChatClientInitalizer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast(new MyChatClientHandler());
}
}
- 创建自定义处理器MyChatClientHandler, 很简单,只是输出一条消息。
示例代码:
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
以上几个步骤就完成了聊天室程序的编码工作,下面运行服务端和客户端程序,运行结果如下:
MyChatClient(1) | MyChatClient(2) | MyChatClient(3) | MyChatServer |
---|---|---|---|
Run | None | None | /127.0.0.1:51049 上线了 |
[服务器] - /127.0.0.1:51055 加入了 | Run | None | /127.0.0.1:51055 上线了 |
[服务器] - /127.0.0.1:51114 加入了 | [服务器] - /127.0.0.1:51114 加入了 | Run | /127.0.0.1:51114 上线了 |
None:表示未运行 Run:表示运行