目录架构

image.png
POM文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hmhb</groupId>
<artifactId>hn</artifactId>
<version>0.0.1</version>
<name>hn</name>
<description>hn</description>
<properties>
<java.version>1.8</java.version>
</properties>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <scope>runtime</scope>-->
<!-- </dependency>-->
<!-- <!–mybatis-plus 的启动器–>-->
<!-- <dependency>-->
<!-- <groupId>com.baomidou</groupId>-->
<!-- <artifactId>mybatis-plus-boot-starter</artifactId>-->
<!-- <version>3.4.1</version>-->
<!-- </dependency>-->
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- Needed for Async Logging with Log4j 2 -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0.RC1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.github.xiaoyao9184.hj-t212-parser</groupId>
<artifactId>hj-t212-parser</artifactId>
<version>ac5d822c</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.2.4.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<exclusions><!-- 去掉springboot默认配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
yml的配置
server:
port: 9112
spring:
rabbitmq:
host: localhost
username: rabbitmq
password: rabbitmq
port: 5672
virtual-host: /
tcpConfig:
port: 9111
RealTimeData(实时数据类)
package com.hmhb.hn.hj212data.monitoringdata;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xy.format.hbt212.model.standard.Pollution;
import lombok.Data;
import javax.json.bind.annotation.JsonbProperty;
import java.util.Map;
/**
* 实时数据
*
* @author weiwei
* @date 2022/06/23
*/
@Data
public class RealTimeData {
/**
* 设备唯一标识号
*/
private String mn;
/**
* 污染因子
*/
@JsonProperty("Pollution")
@JsonbProperty("Pollution")
private Map<String, Pollution> pollution;
/**
* 数据来源
*/
private String source;
}
netty相关类
package com.hmhb.hn.netty.tcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 自定义Channel初始话
*
* @author weiwei
* @date 2022/06/23
*/
@Component
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Resource
MyServerHandler myServerHandler;
@Override
protected void initChannel(SocketChannel channel) {
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(myServerHandler);
}
}
package com.hmhb.hn.netty.tcp;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import com.hmhb.hn.hj212data.monitoringdata.RealTimeData;
import com.xy.format.hbt212.core.T212Mapper;
import com.xy.format.hbt212.model.standard.Data;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 自定义服务器处理handler
*
* @author weiwei
* @date 2022/06/23
*/
@Component
@ChannelHandler.Sharable
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Resource
private RabbitTemplate rabbitTemplate;
private static final Log StaticLog = LogFactory.get();
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
StaticLog.info("客户端连接NettyServer(TCP)连接开始!");
StaticLog.info("连接日志信息:有一客户端链接到本服务端");
StaticLog.info("连接日志信息:该客户端IP:{}, Port: {}", channel.localAddress().getHostString(),channel.localAddress().getPort());
StaticLog.info("连接完毕!");
//通知客户端链接建立成功
String channelId = ctx.channel().id().asLongText();
NettyChannelService.saveChannel(channelId, ctx);
StaticLog.info("连接请求进入: channel.id:{}, 地址:{}",channelId,ctx.channel().remoteAddress());
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
StaticLog.info("客户端断开链接{}",ctx.channel().localAddress().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
StaticLog.info("时间:{}, 接收到消息:{}",DateUtil.now(),msg);
//通知客户端链消息发送成功
ByteBuf in = (ByteBuf)msg;
int readableBytes = in.readableBytes();
byte[] bytes =new byte[readableBytes];
in.readBytes(bytes);
String h212 = new String(bytes);
StaticLog.info("[client->server]:{}", h212);
StaticLog.info("[server ip]:{}", ctx.channel().remoteAddress());
System.out.print(in.toString(CharsetUtil.UTF_8));
//配置默认解析器和默认校验器
try {
T212Mapper mapper = new T212Mapper()
.enableDefaultParserFeatures()
.enableDefaultVerifyFeatures();
Data data = mapper.readData(h212);
StaticLog.info("服务端收到hj212:{} 数据为-- {}", DateUtil.now(),JSONUtil.toJsonStr(data));
RealTimeData realTimeData = new RealTimeData();
//转换成环保管家的数据
if(BeanUtil.isNotEmpty(data)){
String mn = data.getMn();
realTimeData.setMn(mn);
realTimeData.setPollution(data.getCp().getPollution());
realTimeData.setSource("0");
}
//向环保管家发送的数据
String hbgjData = JSONUtil.toJsonStr(realTimeData);
rabbitTemplate.convertAndSend("hn_topic_exchange","hn.tt", hbgjData);
StaticLog.info("向rabbitmq的---交换机:hn_topic_exchange,路由key:hn.tt---发送数据成功!时间:{} ------hbgjData: 数据为-- {}", DateUtil.now(),hbgjData);
}catch (Exception e){
StaticLog.error("hj-212 解析成Data对象数据出错--------------------");
StaticLog.error(e);
}
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
StaticLog.debug("异常信息: exceptionCaught方法出错,错误为:{}",cause.getMessage());
}
}
package com.hmhb.hn.netty.tcp;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* nettyChannel(记录)服务
*
* @author weiwei
* @date 2022/06/23
*/
public class NettyChannelService {
private static ConcurrentHashMap<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
public static Map<String, ChannelHandlerContext> getChannels() {
return map;
}
public static void saveChannel(String key, ChannelHandlerContext ctx) {
if (map == null) {
map = new ConcurrentHashMap<>();
}
map.put(key, ctx);
}
public static ChannelHandlerContext getChannel(String key) {
if (map == null || map.isEmpty()) {
return null;
}
return map.get(key);
}
public static void removeChannel(String key) {
map.remove(key);
}
}
package com.hmhb.hn.netty.tcp;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* netty服务器(TCP)
*
* @author weiwei
* @date 2022/06/23
*/
@Component
public class NettyServer {
@Resource
private MyChannelInitializer myChannelInitializer;
private static final Log StaticLog = LogFactory.get();
public void start(Integer port){
StaticLog.info("netty服务器(TCP)启动");
bing(port);
}
private void bing(int port) {
//配置服务端NIO线程组
//负责连接请求的boss
EventLoopGroup bossGroup = new NioEventLoopGroup();
//负责读写的worker
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//服务启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
// 日志处理 info级别
.handler(new LoggingHandler(LogLevel.INFO))
//ChannelOption.SO_BACKLOG对应的是tcp/ip协议, listen函数 中的 backlog 参数,用来初始化服务端可连接队列。
.option(ChannelOption.SO_BACKLOG, 128)
//配置自定义初始化handler
.childHandler(myChannelInitializer);
ChannelFuture f = b.bind(port).sync();
StaticLog.info("服务端绑定端口成功!netty服务(TCP)启动");
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭childGroup和parentGroup
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.hmhb.hn.netty.tcp;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* tcp服务器启动(springboot项目启动时)
*
* @author weiwei
* @date 2022/06/23
*/
@Component
@Slf4j
@Order(2)
public class StartNettyTcpServer implements ApplicationRunner {
private static final Log StaticLog = LogFactory.get();
@Resource
private NettyServer nettyServer;
@Value("${tcpConfig.port}")
private Integer port;
@Async
@Override
public void run(ApplicationArguments args) {
StaticLog.info("开始调用nettyServer.start方法");
nettyServer.start(port);
}
}
Rabbitmq客户端的配置类
package com.hmhb.hn.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
/**
* 怀宁交换机配置
*
* @return {@link Exchange}
*/
@Bean("hnExchange")
public Exchange hnExchange(){
return ExchangeBuilder.topicExchange("hn_topic_exchange").durable(true).build();
}
/**
* 怀宁队列配置
*
* @return {@link Queue}
*/
@Bean("hnQueqe")
public Queue hnQueqe(){
return QueueBuilder.durable("NATIONAL_MONITOR_DEVICE_QUEUE").build();
}
/**
* 绑定队列和交换机
*
* @param queue 队列
* @param exchange 交换
* @return {@link Binding}
*/
@Bean
public Binding bindQueueExchange(@Qualifier("hnQueqe") Queue queue, @Qualifier("hnExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("hn.#").noargs();
}
}