seata中使用netty完成了TM、RM与TC之间的通信,若不熟悉netty的语法,那么阅读seata的源码是比较困难的,本文实现了一个简单的netty通信的demo。来入门netty通信。
1. 引入依赖
<dependencies>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.30.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- Protostuff -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.9</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.9</version>
</dependency>
<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
</dependencies>
2. 定义基类对象
请求对象:
/**
* 请求对象体
*/
public class Request {
private String requestId;
private Object parameter;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getParameter() {
return parameter;
}
public void setParameter(Object parameter) {
this.parameter = parameter;
}
}
响应对象:
/**
* 响应对象体
*/
public class Response {
private String requestId;
private Object result;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
3. 自定义传输协议
netty可以自定义传输协议,实现如下列代码
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 请求的编码格式
* 自定义传输协议,length、data
*/
public class RpcEncoder extends MessageToByteEncoder {
//目标对象类型进行编码
private Class<?> target;
public RpcEncoder(Class target) {
this.target = target;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (target.isInstance(msg)) {
byte[] data = JSON.toJSONBytes(msg); // 使用fastJson将对象转换为byte
out.writeInt(data.length); // 先将消息长度写入,也就是消息头
out.writeBytes(data); // 消息体中包含我们要发送的数据
}
}
}
/**
* 请求的解码逻辑(协议)
*/
public class RpcDecoder extends ByteToMessageDecoder {
// 目标对象类型进行解码
private Class<?> target;
public RpcDecoder(Class target) {
this.target = target;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 不够长度丢弃
if (in.readableBytes() < 4) {
return;
}
// 标记一下当前的readIndex的位置
in.markReaderIndex();
// 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
int dataLength = in.readInt();
// 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = JSON.parseObject(data, target); // 将byte数据转化为我们需要的对象
out.add(obj);
}
}
4. 定义消息处理器
channl收到消息后,经过decode后需要交由消息处理器进行处理:
/**
* 消息处理器
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
log.info("=========》》》Client Data:" + JSON.toJSONString(request));
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setResult("Hello Client !");
// client接收到信息后主动关闭掉连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
注意:使用ctx.writeAndFlush(response)
完成响应消息的发送。
5. 定义netty服务器端
/**
* netty的服务端
*/
public class NettyServer {
private String ip;
private int port;
public NettyServer(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void server() throws Exception {
/**
* Server端的EventLoopGroup分为两个
* workerGroup作为处理请求,
* bossGroup作为接收请求。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* ChannelOption四个常量作为TCP连接中的属性。
*
*
*/
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class));
//注册消息处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
// .addLast(new NettyServerHandler());
}
});
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启长连接
ChannelFuture future = serverBootstrap.bind(ip, port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyServer("127.0.0.1", 20000).server();
}
}
启动该类后,server便一直监听20000端口的消息。
6. 定义netty客户端
实现了SimpleChannelInboundHandler<Response>
类,重写了channelRead0
方法,即消息处理器。
/**
* netty的客户端,并实现了SimpleChannelInboundHandler接口,即也可以处理消息。
*/
public class NettyClient extends SimpleChannelInboundHandler<Response> {
private final String ip;
private final int port;
private Response response;
public NettyClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response;
}
public Response client(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建并初始化 Netty 客户端 Bootstrap 对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcDecoder(Response.class));
pipeline.addLast(new RpcEncoder(Request.class));
pipeline.addLast(NettyClient.this);
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 连接 RPC 服务器
ChannelFuture future = bootstrap.connect(ip, port).sync();
// 写入 RPC 请求数据并关闭连接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return response;
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString());
request.setParameter("Hello Server !");
System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 20000).client(request)));
}
}