用过Netty的人明显表现对它的偏爱,有没有?!
为什么要用netty再实现一遍?
上一篇已经实现了串口通信。当然,简单实现还是远远不够的。即使是串口通信,也需要对收发数据进行编解码处理吧?也需要保证数据的完整性吧?也需要协议吧?也需要把业务逻辑的部分单独处理吧?
下面上代码:
1.编解码类;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* 对收到的数据进行解码
* @author 程就人生
* @Date
*/
public class ByteArrayDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
int dataLength = in.readableBytes();
byte[] array = new byte[dataLength];
in.readBytes(array, 0, dataLength);
if(array.length > 0){
out.add(array);
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* 对发出的数据进行编码
* @author 程就人生
* @Date
*/
@Slf4j
public class ByteArrayEncoder extends MessageToByteEncoder<byte[]>{
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
log.info(".....经过ByteArrayEncoder编码.....");
//消息体,包含我们要发送的数据
out.writeBytes(msg);
}
}
2.数据接收处理类;
import org.springframework.stereotype.Service;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.channel.ChannelHandler;
/**
* 串口接收数据处理器
* @author 程就人生
* @Date
*/
@Service("rxtxHandler")
@ChannelHandler.Sharable
public class RxtxHandler extends SimpleChannelInboundHandler<byte[]>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
//文本方式编解码,String
//System.out.println("接收到:"+msg);
// 十六进制发送编解码
int dataLength = msg.length;
ByteBuf buf = Unpooled.buffer(dataLength);
buf.writeBytes(msg);
System.out.println("接收到:");
while(buf.isReadable()){
System.out.print(" " + buf.readByte());
}
System.out.println("");
// 释放资源
ReferenceCountUtil.release(msg);
}
}
3.串口参数配置类;
import io.netty.channel.rxtx.RxtxChannelConfig;
import io.netty.channel.rxtx.RxtxChannelConfig.Databits;
import io.netty.channel.rxtx.RxtxChannelConfig.Paritybit;
import io.netty.channel.rxtx.RxtxChannelConfig.Stopbits;
import lombok.Data;
/**
* 串口参数配置类
* @author
* @date 2022年4月26日
*
*/
@Data
public final class SerialPortParam {
/**
* 串口名称,以COM开头(COM0、COM1、COM2等等)
*/
private String serialPortName;
/**
* 波特率, 默认:115200
*/
private int baudRate = 115200;
/**
* 数据位 默认8位
* 可以设置的值:SerialPort.DATABITS_5、SerialPort.DATABITS_6、SerialPort.DATABITS_7、SerialPort.DATABITS_8
*/
private Databits dataBits = RxtxChannelConfig.Databits.DATABITS_8;
/**
* 停止位
* 可以设置的值:SerialPort.STOPBITS_1、SerialPort.STOPBITS_2、SerialPort.STOPBITS_1_5
*/
private Stopbits stopBits = RxtxChannelConfig.Stopbits.STOPBITS_1;
/**
* 校验位
* 可以设置的值:SerialPort.PARITY_NONE、SerialPort.PARITY_ODD、SerialPort.PARITY_EVEN、SerialPort.PARITY_MARK、SerialPort.PARITY_SPACE
*/
private Paritybit parity = RxtxChannelConfig.Paritybit.NONE;
}
4.netty整合串口的启动类;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.rxtx.RxtxChannel;
import io.netty.channel.rxtx.RxtxDeviceAddress;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* 串口接收数据的服务端
* @author 程就人生
* @Date
*/
@Slf4j
@Data
@Component
public class RxtxServer {
private RxtxChannel channel;
private SerialPortParam serialPortParam;
@Autowired
private RxtxHandler handler;
public void createRxtx() throws Exception {
// 串口使用阻塞io
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channelFactory(() -> {
RxtxChannel rxtxChannel = new RxtxChannel();
rxtxChannel.config()
.setBaudrate(serialPortParam.getBaudRate()) // 波特率
.setDatabits(serialPortParam.getDataBits()) // 数据位
.setParitybit(serialPortParam.getParity()) // 校验位
.setStopbits(serialPortParam.getStopBits()); // 停止位
return rxtxChannel ;
})
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
protected void initChannel(RxtxChannel rxtxChannel) {
rxtxChannel.pipeline().addLast(
// new LineBasedFrameDecoder(60000),
// 文本形式发送编解码
// new StringEncoder(StandardCharsets.UTF_8),
// new StringDecoder(StandardCharsets.UTF_8),
// 十六进制形式发送编解码
new ByteArrayDecoder(),
new ByteArrayEncoder(),
handler
);
}
});
ChannelFuture f = bootstrap.connect(new RxtxDeviceAddress(serialPortParam.getSerialPortName())).sync();
f.addListener(connectedListener);
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
// 连接监听
GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败");
}else{
channel = (RxtxChannel) f.channel();
log.info("连接成功");
sendData();
}
};
/**
* 发送数据
*/
public void sendData(){
// 十六机制形式发送
ByteBuf buf = Unpooled.buffer(2);
buf.writeByte(3);
buf.writeByte(2);
channel.writeAndFlush(buf.array());
// 文本形式发送
//channel.writeAndFlush("2");
}
public void start(){
CompletableFuture.runAsync(()->{
try {
// 阻塞的函数
createRxtx();
} catch (Exception e) {
e.printStackTrace();
}
}, Executors.newSingleThreadExecutor());//不传默认使用ForkJoinPool,都是守护线程
}
}
串口连接成功后,发送一次数据。
5.把串口启动类放在入口程序里,实际业务中根据实际情况调整;
@SpringBootApplication
public class SpringBootSqliteApplication {
public static void main(String[] args) {
//获取application的上下文
ApplicationContext applicationContext = SpringApplication.run(SpringBootSqliteApplication.class, args);
// 串口连接服务类
RxtxServer rxtxServer = applicationContext.getBean(RxtxServer.class);
SerialPortParam serialPort = new SerialPortParam();
// 连接串口com1
serialPort.setSerialPortName("COM1");
rxtxServer.setSerialPortParam(serialPort);
rxtxServer.start();
try {
// 连接串口需要一点时间,这里稍微等待一下
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 发送数据
rxtxServer.sendData();
}
}
这里又发送了一次数据,所以一共发送了两次数据。
6.运行项目;
在运行项目前,把串口调试工具设置一下,发送数据采用Hex,接收数据也是Hex;如果是String类型,则可以去掉这项勾选;并且设置数据5s中发送一次;
在控制台,可以看到接收到的数据;
在串口调试工具这里,又接收到了两组数据。
最后总结
以上便是netty关于串口采用字节数组的形式通讯的编码。整个编码流程和TCP协议通讯的流程一样。但是,Netty关于串口的编程,相关类相关方法已经过时了。为什么过时了,还有没有什么更好的架包来替代,这是在生产中需要持续关注的。参考资料:
串口调试工具下载:
http://www.zlmcu.com/document/com_debug_tools.html
http://www.zlmcu.com/download.htm
虚拟串口工具下载:
https://www.iotplat.top/#/iot/soft/604ed56de1457928489bf826
相关文档:
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端
Netty 之 IdleStateHandler 心跳检测(部分源码分析),实现超时断开连接
Netty 之 ByteBuf 几种分配方案 及 内存溢出相关Bug
Netty整合JBoss Marshalling编解码
Netty整合Protobuf编解码,并解决半包问题