一、 组件介绍
-
Channel
在 Netty 中, Channel 是一个 Socket 连接的抽象, 它为用户提供了关于底层 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作。
服务器连接监听的channel ,也叫parent channel
。 对应每一个客户端连接读写的channel,叫child channel
。 -
EventLoopGroup
Netty的调度模块称为EvenLoopGroup,它包含一组EventLoop,Channel通过注册到EventLoop中执行操作。默认EventLoop个数为cpu核数的两倍
BossGroup(boss线程组):相当于mainReactor,负责建立连接并且把连接注册到WorkGroup中。
WorkerGroup(worker线程组):相当于subReactor,WorkGroup负责处理连接对应的读写事件。
boss group和worker goup相当于多Reactor多线程
设计模式。
-
Channel类型
NioDatagramChannel
,表示异步的UDP连接;
NioSocketChannel
表示异步的TCP连接
NioServerSocketChannel
表示异步的TCP Socket连接,对每一个新进来的连接都会创建一个SocketChannel。 -
Bootstrap和ServerBootstrap
启动器,完成Netty客户端或服务端的初始化;
因为UDP是无连接的,所有直接使用Bootstrap;Http则使用ServerBootstrap
启动流程:
二、主要逻辑
udp服务启动相关逻辑:
public void startUdpServer(){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) //配置一个线程组
.channel(NioDatagramChannel.class) //设置channel类型为UPD
.option(ChannelOption.SO_BROADCAST, true) //支持广播
.option(ChannelOption.SO_RCVBUF, 2048 * 1024)// 设置channel读缓冲区大小
.option(ChannelOption.SO_SNDBUF, 2048 * 1024)// 设置channel写缓冲区大小
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception { //装配handler流水线
ChannelPipeline pipeline = ch.pipeline(); //Handler将按pipeline中添加的顺序执行
pipeline.addLast(new UdpServerHandler(serviceFactory)); //自定义的处理器
}
});
//绑定端口(默认是异步的,可以加ChannelFunture的监听事件),sync()同步阻塞等待连接成功;客户端使用.connect(host, port)连接
ChannelFuture channelFuture = bootstrap.bind(port).sync();
log.info("udp服务器启动,端口为"+port);
nettyUtil.setChannel(channelFuture.channel());
//sync()同步阻塞等待channel关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//关闭资源
group.shutdownGracefully();
}
}
udp自定义handler相关逻辑,注意这里泛型为DatagramPacket,表示接收处理UDP报文
@Slf4j
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private ServiceFactory serviceFactory;
//创建可缓存线程池
ExecutorService executorService = Executors.newCachedThreadPool();
public ServerHandler(ServiceFactory serviceFactory){
this.serviceFactory = serviceFactory;
}
//监听channel的消息,注意此时的handler为单线程处理,可以把请求加到线程池中提升效率
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
DatagramPacket p = packet.copy();
//将请求放在线程池处理
executorService.execute(new UdpHandlerRunnable(p));
}
class UdpHandlerRunnable implements Runnable{
DatagramPacket packet;
UdpHandlerRunnable(DatagramPacket packet){
this.packet = packet;
}
public void run(){
ByteBuf byteBuf = packet.content();
byteBuf.retain(); // byteBuf引用计数加1,避免报引用为0异常
String content = new String(ByteBufUtil.getBytes(byteBuf));
log.info("得到来自 "+packet.sender()+" 的请求, content = " + content);
// 业务逻辑
// ...
byteBuf.release(); //注意释放byteBuf和packet
packet.release();
}
}
}
http服务启动相关逻辑
public void startHttpServer(){
EventLoopGroup bossGroup = new NioEventLoopGroup(); //boss工作组
EventLoopGroup workerGroup = new NioEventLoopGroup(); //worker工作组
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //设置channel类型为NioServerSocketChannel
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)// 设置channel读缓冲区为2M
.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)// 设置channel写缓冲区为1M
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception { //装配handler流水线
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpResponseEncoder()); //编码器
pipeline.addLast(new HttpRequestDecoder()); //解码器
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//设置块的最大字节数
pipeline.addLast(new HttpServerHandler(serviceFactory)); //自定义的处理器
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //绑定端口,sync()同步阻塞等待连接成功
log.info("http服务器启动,端口为"+port);
nettyUtil.setChannel(channelFuture.channel());
//sync()同步阻塞等待channel关闭
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally{
//关闭资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
http自定义handler相关逻辑
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private ServiceFactory serviceFactory;
public HttpServerHandler(ServiceFactory serviceFactory){
this.serviceFactory = serviceFactory;
}
//监听channel的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String method = request.method().name();
String uri = request.uri();
String body = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
log.info("method = {}, uri = {}, body = {}", method, uri, body);
//具体业务逻辑...
}
//异常捕获
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
NettyUtil.httpResponse(ctx.channel(), Error.error("错误请求"));
}
}
发送请求的工具类
/**
* udp单播发送
* @param host
* @param port
* @param pushMsg 待发送的信息
*/
public void singleCast(String host, int port, String pushMsg){
InetSocketAddress remoteAddress = new InetSocketAddress(host, port); //远程地址
ByteBuf byteBuf1 = new UnpooledByteBufAllocator(false).buffer();
byteBuf1.writeCharSequence(pushMsg, CharsetUtil.UTF_8);
DatagramPacket packet = new DatagramPacket(byteBuf1, remoteAddress);
this.channel.writeAndFlush(packet);
}
/**
* http响应
* @param channel
* @param sendMsg 响应的信息
*/
public void httpResponse(Channel channel, String sendMsg){
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer(sendMsg.getBytes(CharsetUtil.UTF_8)));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
channel.writeAndFlush(response);
}
/**
* 参数转map
* @param fullReq
* @return
*/
public Map<String, String> parse(FullHttpRequest fullReq) {
try{
HttpMethod method = fullReq.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq);
decoder.offer(fullReq);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
log.error("暂不支持该方法");
}
return parmMap;
}catch (IOException e) {
return null;
}
}
/**
* 文件下载
* @param ctx
* @param fileName
* @param type
*/
public void responseExportFile(ChannelHandlerContext ctx, String fileName, String type) {
try {
StringBuilder path = new StringBuilder();
if( type.equals("1") ){ //apk
path.append(".").append(File.separator).append("upload").append(File.separator).append("apk").append(File.separator).append(fileName);
}
if( type.equals("2") ){ //图片
path.append(".").append(File.separator).append("upload").append(File.separator).append("img").append(File.separator).append(fileName);
}
File file = new File(path.toString()).getCanonicalFile();
if(!file.exists()){
log.error("下载文件不存在, path = {}", path);
this.httpResponse(ctx.channel(), JSON.toJSONString(Result.failure("下载文件不存在")));
return;
}
//随机读取文件
final RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
//定义response对象
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//设置请求头部
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8");
response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION,
"attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";");
ctx.write(response);
//设置事件通知对象
ChannelFuture sendFileFuture = ctx
.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
//文件传输完成执行监听器
@Override
public void operationComplete(ChannelProgressiveFuture future)
throws Exception {
log.info("文件 {} 下载成功.", fileName);
}
//文件传输进度监听器
@Override
public void operationProgressed(ChannelProgressiveFuture future,
long progress, long total) throws Exception {
}
});
//刷新缓冲区数据,文件结束标志符
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}