springboot,netty创建tcp整合hj-212协议并发送到Rabbitmq

目录架构

image.png

POM文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hmhb</groupId>
    <artifactId>hn</artifactId>
    <version>0.0.1</version>
    <name>hn</name>
    <description>hn</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <repositories>
        <repository>
            <id>jitpack.io</id>
            <url>https://jitpack.io</url>
        </repository>
    </repositories>


    <dependencies>

<!--        <dependency>-->
<!--            <groupId>mysql</groupId>-->
<!--            <artifactId>mysql-connector-java</artifactId>-->
<!--            <scope>runtime</scope>-->
<!--        </dependency>-->
<!--        &lt;!&ndash;mybatis-plus 的启动器&ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>com.baomidou</groupId>-->
<!--            <artifactId>mybatis-plus-boot-starter</artifactId>-->
<!--            <version>3.4.1</version>-->
<!--        </dependency>-->

        <dependency> <!-- 引入log4j2依赖 -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <!-- Needed for Async Logging with Log4j 2 -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>4.0.0.RC1</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.3</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.xiaoyao9184.hj-t212-parser</groupId>
            <artifactId>hj-t212-parser</artifactId>
            <version>ac5d822c</version>
        </dependency>

        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>5.2.4.Final</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <exclusions><!-- 去掉springboot默认配置 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

yml的配置

server:
  port: 9112

spring:
  rabbitmq:
    host: localhost
    username: rabbitmq
    password: rabbitmq
    port: 5672
    virtual-host: /


tcpConfig:
  port: 9111

RealTimeData(实时数据类)

package com.hmhb.hn.hj212data.monitoringdata;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.xy.format.hbt212.model.standard.Pollution;
import lombok.Data;

import javax.json.bind.annotation.JsonbProperty;
import java.util.Map;

/**
 * 实时数据
 *
 * @author weiwei
 * @date 2022/06/23
 */
@Data
public class RealTimeData {


    /**
     * 设备唯一标识号
     */
    private String mn;

    /**
     * 污染因子
     */
    @JsonProperty("Pollution")
    @JsonbProperty("Pollution")
    private Map<String, Pollution> pollution;

    /**
     * 数据来源
     */
    private String source;

}

netty相关类

package com.hmhb.hn.netty.tcp;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 自定义Channel初始话
 *
 * @author weiwei
 * @date 2022/06/23
 */
@Component
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Resource
    MyServerHandler myServerHandler;

    @Override
    protected void initChannel(SocketChannel channel) {
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(myServerHandler);
    }

}
package com.hmhb.hn.netty.tcp;


import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import com.hmhb.hn.hj212data.monitoringdata.RealTimeData;
import com.xy.format.hbt212.core.T212Mapper;
import com.xy.format.hbt212.model.standard.Data;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 自定义服务器处理handler
 *
 * @author weiwei
 * @date 2022/06/23
 */
@Component
@ChannelHandler.Sharable
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Resource
    private RabbitTemplate rabbitTemplate;

    private static final Log StaticLog = LogFactory.get();

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        SocketChannel channel = (SocketChannel) ctx.channel();
        StaticLog.info("客户端连接NettyServer(TCP)连接开始!");
        StaticLog.info("连接日志信息:有一客户端链接到本服务端");
        StaticLog.info("连接日志信息:该客户端IP:{}, Port: {}", channel.localAddress().getHostString(),channel.localAddress().getPort());
        StaticLog.info("连接完毕!");
        //通知客户端链接建立成功
        String channelId = ctx.channel().id().asLongText();
        NettyChannelService.saveChannel(channelId, ctx);
        StaticLog.info("连接请求进入: channel.id:{}, 地址:{}",channelId,ctx.channel().remoteAddress());
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        StaticLog.info("客户端断开链接{}",ctx.channel().localAddress().toString());
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        StaticLog.info("时间:{}, 接收到消息:{}",DateUtil.now(),msg);
        //通知客户端链消息发送成功
        ByteBuf in = (ByteBuf)msg;
        int readableBytes = in.readableBytes();
        byte[] bytes =new byte[readableBytes];
        in.readBytes(bytes);
        String h212 = new String(bytes);
        StaticLog.info("[client->server]:{}", h212);
        StaticLog.info("[server ip]:{}", ctx.channel().remoteAddress());
        System.out.print(in.toString(CharsetUtil.UTF_8));

        //配置默认解析器和默认校验器
        try {
        T212Mapper mapper = new T212Mapper()
                .enableDefaultParserFeatures()
                .enableDefaultVerifyFeatures();

        Data data = mapper.readData(h212);
        StaticLog.info("服务端收到hj212:{} 数据为-- {}", DateUtil.now(),JSONUtil.toJsonStr(data));
        RealTimeData realTimeData = new RealTimeData();
        //转换成环保管家的数据
        if(BeanUtil.isNotEmpty(data)){
            String mn = data.getMn();
            realTimeData.setMn(mn);
            realTimeData.setPollution(data.getCp().getPollution());
            realTimeData.setSource("0");
        }
        //向环保管家发送的数据
        String hbgjData = JSONUtil.toJsonStr(realTimeData);
        rabbitTemplate.convertAndSend("hn_topic_exchange","hn.tt", hbgjData);
        StaticLog.info("向rabbitmq的---交换机:hn_topic_exchange,路由key:hn.tt---发送数据成功!时间:{} ------hbgjData: 数据为-- {}", DateUtil.now(),hbgjData);
        }catch (Exception e){
            StaticLog.error("hj-212 解析成Data对象数据出错--------------------");
            StaticLog.error(e);
        }
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        StaticLog.debug("异常信息: exceptionCaught方法出错,错误为:{}",cause.getMessage());
    }


}
package com.hmhb.hn.netty.tcp;

import io.netty.channel.ChannelHandlerContext;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * nettyChannel(记录)服务
 *
 * @author weiwei
 * @date 2022/06/23
 */
public class NettyChannelService {

    private static ConcurrentHashMap<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();

    public static Map<String, ChannelHandlerContext> getChannels() {
        return map;
    }

    public static void saveChannel(String key, ChannelHandlerContext ctx) {
        if (map == null) {
            map = new ConcurrentHashMap<>();
        }
        map.put(key, ctx);
    }

    public static ChannelHandlerContext getChannel(String key) {
        if (map == null || map.isEmpty()) {
            return null;
        }
        return map.get(key);
    }

    public static void removeChannel(String key) {
        map.remove(key);
    }

}
package com.hmhb.hn.netty.tcp;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


/**
 * netty服务器(TCP)
 *
 * @author weiwei
 * @date 2022/06/23
 */
@Component
public class NettyServer {

    @Resource
    private MyChannelInitializer myChannelInitializer;

    private static final Log StaticLog = LogFactory.get();

    public void start(Integer port){
        StaticLog.info("netty服务器(TCP)启动");
        bing(port);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        //负责连接请求的boss
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //负责读写的worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //服务启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    // 日志处理 info级别
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //ChannelOption.SO_BACKLOG对应的是tcp/ip协议, listen函数 中的 backlog 参数,用来初始化服务端可连接队列。
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //配置自定义初始化handler
                    .childHandler(myChannelInitializer);
            ChannelFuture f = b.bind(port).sync();
            StaticLog.info("服务端绑定端口成功!netty服务(TCP)启动");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭childGroup和parentGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}
package com.hmhb.hn.netty.tcp;


import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import cn.hutool.log.StaticLog;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * tcp服务器启动(springboot项目启动时)
 *
 * @author weiwei
 * @date 2022/06/23
 */
@Component
@Slf4j
@Order(2)
public class StartNettyTcpServer  implements ApplicationRunner {

    private static final Log StaticLog = LogFactory.get();
    @Resource
    private NettyServer nettyServer;

    @Value("${tcpConfig.port}")
    private Integer port;

    @Async
    @Override
    public void run(ApplicationArguments args) {
        StaticLog.info("开始调用nettyServer.start方法");
        nettyServer.start(port);
    }


}

Rabbitmq客户端的配置类

package com.hmhb.hn.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {

    /**
     * 怀宁交换机配置
     *
     * @return {@link Exchange}
     */
    @Bean("hnExchange")
    public Exchange hnExchange(){
        return ExchangeBuilder.topicExchange("hn_topic_exchange").durable(true).build();
    }

    /**
     * 怀宁队列配置
     *
     * @return {@link Queue}
     */
    @Bean("hnQueqe")
    public Queue hnQueqe(){
        return QueueBuilder.durable("NATIONAL_MONITOR_DEVICE_QUEUE").build();
    }

    /**
     * 绑定队列和交换机
     *
     * @param queue    队列
     * @param exchange 交换
     * @return {@link Binding}
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("hnQueqe") Queue queue, @Qualifier("hnExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("hn.#").noargs();
    }



}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容