初体验netty+jboss搭建的websocket

前段时间,做项目的时候,让我搭建一个websocket服务,实现微信小程序聊天。但之前没接触过,然后网上各种找资料,看别人的代码。最终,根据别人提供的代码总算可以运行了。
这里,要感谢白马湖小龙王,后面的代码都是他提供的。转载自:
https://blog.csdn.net/weixin_39168678/article/details/79453585

接下来,我就把主要的代码上传上去。如果有需要的自己复制粘贴。

1.首先,我因为用的公司的核格平台(核格平台是一个可视化的快速开发平台,有兴趣的可以去这个论坛下载,http://bbs.hearker.com/,有各种版本的平台),利用平台里的一个jboss服务器启动时候调用的注解类,如下:

package com.sunsheen.websocket.start;

import javax.servlet.ServletContext;

import com.sunsheen.jfids.system.servlet.Listener;
import com.sunsheen.jfids.system.servlet.SystemStartupListener;

/**
 * 服务启动的时候就执行监听
 * @author heWanLi
 * 2018-01-03
 *
 */


@Listener
public class TaskStartupListener implements SystemStartupListener {

@Override
public void init(ServletContext param) {
    new Task().startTask();
}

}

Task类,具体实现开启websocket的逻辑。这样,就能满足在一个jboss服务器下搭建websocket,而不用单独再去弄个服务器。

package com.sunsheen.websocket.start;
import com.sunsheen.websocket.netty.ImServer;
import com.sunsheen.websocket.netty.NettyUtil;


/**
 * 利用启动的时候启动websocket
 * @author HeWanLi
 *
 */
public class Task {

    public void timingTask(){
    
    }

    /**
     * 定时执行任务
     */
    public void startTask(){
        /**
         * 开启一个线程,让websocket服务也启动起来
         */
    
        /*检测端口是否被占用*/
        int port = new ImServer().getPort();
        if(NettyUtil.isPortAvailable(port)){
            System.out.println("启动了websocket");
            Runnable run=new Runnable() {
                @SuppressWarnings("static-access")
                public void run(){
                    try {
                        new ImServer(port).run();
                        System.out.println("启动了websocket");
                    } catch (Exception e) {
                        // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
            };
            new Thread(run).start();
        }
    }

}

接下来是netty实现的主要代码:
2.有一个处理消息是否为空的类:

public class BlankUtil {
    public static boolean isBlank(String messges){
        if (null == messges || "".equals(messges)) {
            return true;
        }
        return false;
    }
}

3.一个netty的工具类:

public class NettyUtil {

/**
 * @param channel
 * @return
 */
public static String parseChannelRemoteAddr(Channel channel) {
    // TODO 自动生成的方法存根
    return channel.remoteAddress().toString().substring(1);
}

/**
 * 将字符串转成Map
 * @param text  {"room_id":"1","uid":"1","sender":"2","type":"1","message":"1"}  这样的格式
 * @return
 */
public static Map<String, Object> changeStringToMap(String text){
    Gson gson = new Gson();
    Map<String, Object> paramMap = gson.fromJson(text, Map.class);
    return paramMap;
}

private static void bindPort(String host, int port) throws Exception {
    Socket s = new Socket();
    s.bind(new InetSocketAddress(host, port));
    s.close();
}
public static boolean isPortAvailable(int port) {
    Socket s = new Socket();
    try {
        bindPort("0.0.0.0", port);
        bindPort(InetAddress.getLocalHost().getHostAddress(), port);
        return true;
    } catch (Exception e) {
        return false;
    }
}
}

4.用户的一个实体类:

public class UserInfo {
  private String userId;  // UID
  private String addr;    // 地址
  private Channel channel;// 通道
  private String sender;  // 消息接收人

public String getAddr() {
    return addr;
}

public void setAddr(String addr) {
    this.addr = addr;
}

public Channel getChannel() {
    return channel;
}

public void setChannel(Channel channel) {
    this.channel = channel;
}

public String getUserId() {
    return userId;
}

public void setUserId(String userId) {
    this.userId = userId;
}

/**
 * @return sender
 */
public String getSender() {
    return sender;
}

/**
 * @param sender 要设置的 sender
 */
public void setSender(String sender) {
    this.sender = sender;
}

}

5.用户处理类,用来添加用户,删除聊天用户,以及处理用户消息:

public class UserInfoManager {
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);

private static ConcurrentHashMap<Channel, UserInfo> userInfos = new ConcurrentHashMap<>();

/**
 * 登录注册 channel
 *
 *  
 */
public static void addChannel(Channel channel,String uid) {
    String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
    UserInfo userInfo = new UserInfo();
    userInfo.setUserId(uid);
    userInfo.setAddr(remoteAddr);
    userInfo.setChannel(channel);
    userInfos.put(channel, userInfo);
}

/**
 * 普通消息
 *
 * @param message
 */
public static void broadcastMess(String uid,String message,String sender) {
    if (!BlankUtil.isBlank(message)) {
        try {
            rwLock.readLock().lock();
            Set<Channel> keySet = userInfos.keySet();
            for (Channel ch : keySet) {
                UserInfo userInfo = userInfos.get(ch);
                if (!userInfo.getUserId().equals(uid) ) continue;
                String backmessage=sender+","+message;
                ch.writeAndFlush(new TextWebSocketFrame(backmessage));
              /*  responseToClient(ch,message);*/
            }
        } finally {
            rwLock.readLock().unlock();
        }
    }
 }

/**
 * @param channel
 */
public static void removeChannel(Channel channel) {
    // TODO 自动生成的方法存根
    userInfos.remove(channel);
}

/**
 * @param channel
 * @return
 */
public static UserInfo getUserInfo(Channel channel) {
    // TODO 自动生成的方法存根
    return userInfos.get(channel);
}

/**
 * 添加房间号信息,注册信息(这个可以不要,主要是为了满足我的业务,而重载的一个方法)
 * @param channel
 * @param changeStringToMap
 */
public static void addChannel(Channel channel,
        Map<String, Object> paramMap) {
    removeChannel(channel);
    /*if(!rooms.containsKey(paramMap.get("room_id"))){
        // 并将聊天室添加到数据库 判断没有的情况下,才添加到数据库
        //new WebsocketService().save(paramMap);
    }*/
    // 并将聊天室添加到数据库 判断没有的情况下,才添加到数据库
    //new WebsocketService().save(paramMap);
    String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
    UserInfo userInfo = new UserInfo();
    userInfo.setUserId(paramMap.get("uid").toString());
    userInfo.setAddr(remoteAddr);
    userInfo.setSender(paramMap.get("sender").toString());
    userInfo.setChannel(channel);
    userInfos.put(channel, userInfo);
    // rooms.put(paramMap.get("room_id").toString(), userInfos);    // 添加一个聊天室
    System.out.print("connect user:"+paramMap.get("uid").toString());
    // 将消息添加到数据库
   //new WebsocketService().saveMessage(paramMap);
}
}

6.一个接收前端数据的处理类,包括页面信息等:

/**
* @author oj
 * 消息处理类
 */
public class SocketHandler extends ChannelInboundHandlerAdapter {
//ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);  
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private WebSocketServerHandshaker handshaker;
private final String wsUri = "/ws";
//websocket握手升级绑定页面 
 String wsFactoryUri = ""; 
/*
 * 握手建立
 */
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.add(incoming);
}

/*
 * 握手取消
 */
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  
    Channel incoming = ctx.channel();
    channels.remove(incoming);
}

/*
 * channelAction
 *
 * channel 通道 action 活跃的
 *
 * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。 
 *
 */
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println(ctx.channel());
  System.out.println(ctx.channel().localAddress().toString() + " 通道已激活!");
}
/*
 * channelInactive
 *
 * channel 通道 Inactive 不活跃的
 *
 * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。 
 *
 */
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
}

/*
 * 功能:读取 h5页面发送过来的信息
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
        handleHttpRequest(ctx, (FullHttpRequest) msg);
    } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
        handleWebSocketFrame(ctx, (WebSocketFrame) msg);
    }
}
/*
 * 功能:读空闲时移除Channel
 */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent evnet = (IdleStateEvent) evt;
        // 判断Channel是否读空闲, 读空闲时移除Channel
        if (evnet.state().equals(IdleState.READER_IDLE)) {                
            UserInfoManager.removeChannel(ctx.channel());
        }
    }
    ctx.fireUserEventTriggered(evt);
}

/*
 * 功能:处理HTTP的代码
 */
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException {
    // 如果HTTP解码失败,返回HHTP异常
    if (req instanceof HttpRequest) {
        HttpMethod method = req.getMethod();
        // 如果是websocket请求就握手升级
        if (wsUri.equalsIgnoreCase(req.getUri())) {
            System.out.println(" req instanceof HttpRequest");
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    wsFactoryUri, null, false);
            handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), req);
            }
        }

    }
}

/*
 * 处理Websocket的代码
 */
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    // 判断是否是关闭链路的指令
    //System.out.println("websocket get");
    if (frame instanceof CloseWebSocketFrame) {
        handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
        return;
    }
    // 判断是否是Ping消息
    if (frame instanceof PingWebSocketFrame) {
        ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
        return;
    }
    // 文本消息,不支持二进制消息
    if (frame instanceof TextWebSocketFrame) {
         // 返回应答消息
        String requestmsg = ((TextWebSocketFrame) frame).text();
        System.out.println("websocket消息======"+requestmsg);
        String[] array= requestmsg.split(",");
        // 将通道加入通道管理器
        UserInfoManager.addChannel(ctx.channel(),array[0]);
        UserInfo userInfo = UserInfoManager.getUserInfo(ctx.channel());
        if (array.length== 3) {
        // 将信息返回给h5
        String sendid=array[0];String friendid=array[1];String messageid=array[2];
        UserInfoManager.broadcastMess(friendid,messageid,sendid);
        }
    }
}
/**
 * 功能:服务端发生异常的操作
 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}
}

7.一个用于启动服务的类:

public class ImServer {
private io.netty.channel.Channel channel;
private  EventLoopGroup bossGroup = new NioEventLoopGroup();
private  EventLoopGroup workerGroup = new NioEventLoopGroup();
private static int port=8080;

/**
 * @return port
 */
public static int getPort() {
    return port;
}

public ImServer() {

}

//线程池设计的定时任务类
public ImServer(int port) {

}
public void run() throws Exception {
    try {
        //创建ServerBootstrap实例
        ServerBootstrap b = new ServerBootstrap();  
       //设置并绑定Reactor线程池
        b.group(bossGroup, workerGroup)
        //设置并绑定服务端Channel
         .channel(NioServerSocketChannel.class)  
         .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("http-codec", new HttpServerCodec()); 
                    pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装  
                    pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持  
                    pipeline.addLast(new SocketHandler());//自定义处理类

                }
         })
         .option(ChannelOption.SO_BACKLOG, 128)  
         .childOption(ChannelOption.SO_KEEPALIVE, true);
//          System.out.println("WebsocketChatServer Start:" + port);
        try {
            ChannelFuture f = b.bind(port).sync();//// 服务器异步创建绑定

            channel = f.channel();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {

        }
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        System.out.println("WebsocketChatServer Stop:" + port);
    }
}
  public static void main(String[] args) throws Exception {
      new ImServer(port).run();
  }
}

需要的jar包分别是:gson-2.2.4.jar,netty-all-4.1.22.Final.jar

需要一个html页面chat.html,如下:

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>简单聊天室</title>
</head>
<script type="text/javascript" src="jquery.min.js"></script>
<body>
<div id="content" class="row-center">
<div id="chat-box" class="row-center">

</div>
<div id="input-box">
 <input class="chat-input" id="chat-input" placeholder="message"></input>
 <input id="myid" placeholder="myid">
 <button id="login-button" onclick="login()">登录</button>
 <input id="friendid" placeholder="friendid">
<button class="chat-button" id="send" onclick="send()">发送</button>
</div> 
</div>
</body>
</html>
<script type="text/javascript">
var ipaddress="127.0.0.1";
//新建socket对象
window.socket = new WebSocket("ws://"+ipaddress+":8080/ws");
//监听netty服务器消息并打印到页面上
socket.onmessage = function(event) {
    var datas=event.data.split(",");
    console.log("服务器消息===="+datas);
    $("#chat-box").text(datas);
    }
//将发送人接收人的id和要发生的消息发送出去
function send(){
    console.log($("#chat-input").val())
    var data=$("#myid").val()+","+$("#friendid").val()+","+$("#chat-input").val()
    socket.send(data)
}     
//登录事件
function login(){
    var data=$("#myid").val();socket.send(data);
}
</script>

最后部署到jboss服务器,启动服务器,然后去websocket测试网站去输入:
ws://localhost:8080/ws 点击连接,看看是否测试成功。如果显示如下,证明可以使用了:

image.png

以上就是全部代码了。但是这个代码还能有 更多优化,比如发图片,发视频,发语音,这些我都还没实现,还没有去尝试。
如果需要源代码的,可以到这个去下载:
https://github.com/wishyoukew/netty-websocket-demo

如果有不好的地方,望见谅,只是从别人那里搬运过来补充完整的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容