后续也会提供service-mesh简单的代码实现
netty通信和socket通信大致是类似的,在socket的基础上对其进行封装,当然你也可以实现netty功能,但是我给你一句话。
为什么要用netty呢,官方给出这样的解释。确实,netty是一个很不错的框架,我们可以基于netty来实现简单的rpc调用
package org.gfu.base.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Date;
/**
* netty client
*
* @author 719383495@qq.com |719383495qq@gmail.com |gfu
* @date 2019/9/27
*/
class NettyClient {
private String host;
private int port;
private String jsonStr;
NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
NettyClient setMessage(String jsonStr) {
this.jsonStr = jsonStr;
return this;
}
void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new NettyClientHandler(jsonStr));
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(
f -> {
if (f.isSuccess()) {
System.out.println("连接成功:" + host + ":" + port);
} else {
System.out.println(new Date() + "-- 连接失败:" + host + ":" + port);
}
}
).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package org.gfu.base.netty;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Scanner;
/**
* netty server handler
*
* @author 719383495@qq.com |719383495qq@gmail.com |gfu
* @date 2019/9/27
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private String jsonStr;
public NettyClientHandler(String jsonStr) {
this.jsonStr = jsonStr;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
ctx.writeAndFlush(jsonStr);
}
}
package org.gfu.base.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Date;
/**
* netty server
*
* @author 719383495@qq.com |719383495qq@gmail.com |gfu
* @date 2019/9/27
*/
class NettyServer {
private String host;
private int port;
NettyServer(String host, int port) {
this.host = host;
this.port = port;
}
void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new NettyServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(host, port).
addListener(f -> {
if (f.isSuccess()) {
System.out.println("绑定成功:" + host + ":" + port);
} else {
System.out.println(new Date() + "--绑定失败:" + host + ":" + port);
}
}).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package org.gfu.base.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
/**
* netty server handler
*
* @author 719383495@qq.com |719383495qq@gmail.com |gfu
* @date 2019/9/27
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private String msg;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
this.msg = msg.toString();
this.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(msg);
ctx.writeAndFlush(msg + "server accept success");
}
}