1 什么是消息推送
很多手机APP会不定时的给用户推送消息,例如一些新闻APP会给用户推送用户可能感兴趣的新闻,或者APP有更新了,会给用户推送是否选择更新的消息等等,这就是所谓的“消息推送”。
对于APP或者桌面客户端这种C/S架构的软件,实现消息推送其实比较简单,只需要维护TCP连接就行了,因为TCP本身是全双工的,客户端和服务端都能发送消息。但Web环境就不太一样了,目前的Web软件大多数都是B/S架构(即浏览器/服务端),使用的消息传输协议也大多数是HTTP,HTTP1.0和HTTP1.1都无法实现服务端向客户端(浏览器)主动发送消息,所以实现的手段主要就是客户端定时或者不定时轮询(例如间隔时间动态变化),这种方式实现并不算复杂,最大的问题就是性能,轮询是需要消耗CPU资源的,如果很长一段时间内,服务端都没有消息要给客户端,那么这个CPU空轮询的占比就比较大了,而且轮询也会对服务端造成压力,因为如果服务端没有消息要给客户端,那么其实这样的“请求-响应”是没有意义的,算是服务端的额外压力。
可能有朋友难以理解上面所描述的情况,下面我画个图来描述这个问题:
可以从图中看到,客户端老是不断的孜孜不倦的跑去问服务端“有没有新的推送消息”,而且还不长记性,每次都问同样的问题(HTTP是无状态的协议),这事给谁谁都得烦,是吧,服务端老被客户端“骚扰”,所以有时候就会“罢工”不干了!(服务端压力过大,短暂不可用)那怎么解决这个问题呢?也就是说让服务端过得舒服一些?
2 解决方案
目前主要有两种主流的解决方案:
- 利用新版的HTTP2.0,重构原有的代码,因为HTTP2.0支持服务端主动发送数据给客户端,但这种方案实现其实是比较困难的,一是服务端实现起来并不简单,而是一般也不用来推送大量的数据,常见的使用场景是请求.html,然后服务端把HTML作为响应给前端,并且同时把CSS,JS文件都“推”给前端(在传统的HTTP中,要拿到这三个东西,至少需要三次HTTP请求)。本文不讨论该方案。
- 利用WebSocket协议,WebSocket是一种在浏览器环境下可以全双工通信的应用层协议(等会儿会给出简单介绍)。这种方案就是利用WebSocket的全双工通信的特性,使得B/S架构的软件也能像C/S架构的软件那样简单的实现消息推送。本文主要介绍的就是这种解决方案。
3 WebSocket协议
WebSocket和HTTP,FTP等一样,都属于应用层协议,诞生于2008年,与2011年称为国际标准(说实话,这段时间真的很短,说明WebSocket确实解决了一些问题)。下面是维基百科上的定义:
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
其优点也不少(这里所提到的优点是相对于HTTP来说的):
- 支持全双工通信,这对于某些场景尤其重要,例如消息推送。
- 更好的二进制支持,HTTP本身是文本传输协议,对于二进制的数据需要特殊处理(不过2.0版本也对二进制做了补充),而WebSocket定义了二进制帧,所以传输二进制数据的时候不需要像HTTP那样特殊处理。
- 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
- ...............
那WebSocket连接是如何建立的呢?答案是:握手协议。
在真正建立WebSocket之前,会先建立一个HTTP连接,然后服务端响应状态码101,表示切换协议,之后通信协议会升级成WebSocket,这样WebSocket连接才算是建立起来。下面是一个WebSocket握手的示例:
客户端请求:
GET / HTTP/1.1
#Upgrade就表示要把协议升级成WebSocket
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
服务端响应:
#客户端收到该HTTP报文之后,会将通信协议升级成WebSocket,之后的数据传输就都使用WebSocket了
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
这就是所谓的握手了(双方建立友好关系)。
4 利用Netty + WebSocket实现消息推送
在动手之前,先声明一点:本文不会介绍WebSocket的简单使用,因为本文的标题是“消息推送”,而不是“WebSocket入门”,Netty也是同理。
首先,我们先确定一些设计方案:
- 客户端和服务端使用WebSocket作为通信协议,当服务端有新的推送消息的时候,主动把消息“推”给客户端。
- Netty作为网络通信的基础框架。
- 服务端监听消息队列,当消息队列中有新的消息时,把消息发送给客户端。
- 可以还有另外一个专门往消息队列里放入消息的服务,至此整个系统就形成一个完整的消息推送系统了。
大致了解了方案之后,可以着手实现了,先来看看服务端的实现:
服务启动类:
public class WebSocketServer {
//RabbitMQ客户端连接工厂
private static ConnectionFactory connectionFactory = new ConnectionFactory();
//客户端连接
private static Connection connection;
//客户端Channel
private static com.rabbitmq.client.Channel channel;
//Jackson,序列化用的
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
//初始化ServerBootstrap
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
server.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(16 * 16 * 1024));
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new WebSocketServerHandler());
}
});
//初始化RabbitMQ的配置
connectionFactory.setHost("xxx.xxx.xxx.xxx");
connectionFactory.setUsername("xxx");
connectionFactory.setPassword("xxx");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RabbitMQConfig.PUSH_MSG_QUEUE, false, false, true, null);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
//服务端绑定8081端口
server.bind(8081).syncUninterruptibly().addListener(future -> {
if (future.isSuccess()) {
System.out.println("绑定成功");
startPushMessage();
startMQListener();
} else {
System.out.println("绑定失败");
}
});
}
//开启消息队列的监听,当有消息的时候,就把消息推送给客户端
private static void startMQListener() {
new Thread(() -> {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
//这里就比较粗暴的获取ChannleGroup了,建议读者尝试的时候用更好的方法
ChannelGroup group = WebSocketServerHandler.group;
if (group != null) {
group.writeAndFlush(new TextWebSocketFrame(message));
}
}
};
System.out.println("开始监听");
try {
channel.basicConsume(RabbitMQConfig.PUSH_MSG_QUEUE, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
private static final AtomicLong id = new AtomicLong(0);
//随机生成一个消息
private static Notify generateNotify() {
Notify notify = new Notify();
notify.setId(id.getAndIncrement());
notify.setTitle(UUID.randomUUID().toString());
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 500; i++) {
builder.append(getRandomChar());
}
notify.setContent(builder.toString());
notify.setPushTime(new Date());
return notify;
}
private static char getRandomChar() {
String str = "";
int hightPos; //
int lowPos;
Random random = new Random();
hightPos = (176 + Math.abs(random.nextInt(39)));
lowPos = (161 + Math.abs(random.nextInt(93)));
byte[] b = new byte[2];
b[0] = (Integer.valueOf(hightPos)).byteValue();
b[1] = (Integer.valueOf(lowPos)).byteValue();
try {
str = new String(b, "GBK");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("错误");
}
return str.charAt(0);
}
//这里我不另外写专门的生产消息的服务了,直接定时的往消息队列里放入消息
private static void startPushMessage() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(() -> {
try {
Notify notify = generateNotify();
String message = objectMapper.writeValueAsString(notify);
channel.basicPublish("", RabbitMQConfig.PUSH_MSG_QUEUE, null, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}, 5, 5, TimeUnit.SECONDS);
}
}
WebSocketHandler类:
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
//WebSocket握手
private static WebSocketServerHandshaker handshaker;
//客户端的群组
public static ChannelGroup group;
//客户端在线人数
private static AtomicLong onlineCount = new AtomicLong(0);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
onlineCount.incrementAndGet();
System.out.println("有用户上线,当前在线人数是: " + onlineCount.get());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//第一次请求肯定是HTTP请求,所以先去处理HTTP请求,在该处理方法里做WebSocket握手的操作
if (msg instanceof FullHttpRequest) {
handlerHttpRequest(ctx, (FullHttpRequest)msg);
}
//能到这,肯定是连接成功了的
if (onlineCount.get() == 1) {
//创建群组
group = new DefaultChannelGroup(ctx.executor());
group.add(ctx.channel());
} else {
group.add(ctx.channel());
}
//之后的请求就都是WebSocket帧了,不过对于我们的系统来说,这倒不是特别主要的
//之所以还要处理,是为了处理客户端主动关闭连接的情况以及维持心跳
if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame)msg);
}
}
private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
//如果request解析失败或者upgrade不是websocket,那么就直接发送BAD_REQUEST状态即可
if (!request.decoderResult().isSuccess()
|| !"websocket".equals(request.headers().get("upgrade"))) {
try {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return;
}
//如果一切正常,那么就开始进行WebSocket握手
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
handshaker = factory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
//这里就是握手操作了
handshaker.handshake(ctx.channel(), request);
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
FullHttpResponse response) throws UnsupportedEncodingException {
if (response.status().code() != 200) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("发生错误哦".getBytes("utf-8"));
response.content().writeBytes(buf);
buf.release();
}
ChannelFuture future = ctx.channel().writeAndFlush(response);
if (request.headers().get("Keep-Alive") == null) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
//如果该帧是CloseWebSocketFrame类型的,也就是说客户端主动关闭连接
//那么就做相应的处理
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
group.remove(ctx.channel());
onlineCount.decrementAndGet();
System.out.println("有用户下线,当前在线人数是: " + onlineCount.get());
return;
}
//WebSocket的客户段会发送心跳数据包,返回PongWebSocketFrame就行了
if (frame instanceof PingWebSocketFrame) {
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
//本系统只支持本文数据
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("不支持该类型消息");
}
//向所有在线的用户发送消息
group.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
}
}
不知道各位注意到没有,我们的系统中不存在WebSocketFrame的编解码器,熟悉Netty的朋友应该知道,如果真的没有WebSocketFrame的编解码器的话,我们的系统是无法处理WebSocket传输的数据的。其实Netty在进行WebSocket握手的时候,就自动的帮我们添加了编解码器,如下是handshaker.handshake(ctx.channel(), request)的源码:
public ChannelFuture handshake(Channel channel, FullHttpRequest req) {
return handshake(channel, req, null, channel.newPromise());
}
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, version());
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
if (p.get(HttpContentCompressor.class) != null) {
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null) {
// this means the user use a HttpServerCodec
ctx = p.context(HttpServerCodec.class);
if (ctx == null) {
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}
//就是这里了,加入默认的编解码器
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
}
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
服务端完事了,接下来看看客户端的代码,其实就是前端代码了(代码是我网上直接抄的):
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket测试</title>
</head>
<body>
<h1>WebSocket测试</h1>
<div id="context"></div>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script type="application/javascript">
var websocket = null;
// 判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
// 创建WebSocket 对象,连接服务器端点
websocket = new WebSocket("ws://localhost:8081/ws");
} else {
alert('您的浏览器不支持websocket');
}
// 连接发生错误的回调方法
websocket.onerror = function() {
appendMessage ("WebSocket连接失败");
}
// 连接成功建立的回调方法
websocket.onopen = function(event) {
appendMessage ("WebSocket连接成功");
}
// 接收到消息的回调方法
websocket.onmessage = function (event) {
console.log("收到消息")
jsonObject = JSON.parse(event.data)
console.log(jsonObject)
appendMessage(jsonObject.title);
}
websocket.onclose = function() {
appendMessage("关闭连接");
}
websocket.onbeforeupload = function() {
websocket.close();
}
function appendMessage(message) {
var context = $('#context').html() + '<br>' + message;
$('#context').html(context);
}
function closeWebSocket() {
websocket.close();
}
function sendMessage() {
var message = $('#message').val();
websocket.send(message);
}
</script>
</body>
</html>
接下来启动服务,然后直接打开该文件,应该就能看到效果了,如下所示:
你也可以多打开几个客户端试试,会发现消息会传递给每个客户端了,而且这期间不存在什么客户端主动请求的情况,即对于客户端来说,这些个消息就好像“天上掉下来”的一样,这就简单实现了消息推送系统,现在再来看看客户端和服务端的通信情况:
对比上面的那张图,是不是觉得更加“清爽”了?现在不再需要在客户端不断去轮询,去骚扰服务端了,当有新的推送消息的时候,服务端就主动的把消息“推”给客户端了,这样服务端的压力也减少了很多,客户端的CPU也不用一直做没有意义的事了。
5 Spring Boot + WebSocket实现消息推送
我还想介绍一种实现方案,这种方案相较于Netty的实现更加的简单,那就是利用Spring 对WebSocket的支持来实现。就不多说废话了,直接来看实现吧:
建立好Spring Boot项目之后,加入如下依赖(maven):
<dependency>
<!--RabbitMQ的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<!--web mvc的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<!--websocket的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
之后来做两个配置,一是配置RabbitMQ,而是配置WebSocket:
//RabbitMQ配置
@Configuration
public class RabbitMQConfig {
//三个分别是队列,交换器以及路由键
public static final String PUSH_MSG_EXCHANGE = "push_msg_exchange";
public static final String PUSH_MSG_QUEUE = "push_msg_queue";
public static final String PUSH_MSG_ROUTE_KEY = "push_msg.direct";
@Bean
public DirectExchange pushMsgExchange() {
return new DirectExchange(PUSH_MSG_EXCHANGE, true, true);
}
@Bean
public Queue pushMsgQueue() {
return new Queue(PUSH_MSG_QUEUE, true, false, true);
}
//将队列和交换器绑定
@Bean
public Binding pushMsgBinding() {
return BindingBuilder.bind(pushMsgQueue()).to(pushMsgExchange()).with(PUSH_MSG_ROUTE_KEY);
}
}
//WebSocket配置
@Configuration
public class WebSocketConfig {
//只需要配置ServerEndpointExporter这个Bean就行了
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
接下来就是实现类了:
@ServerEndpoint("/ws")
@Service
public class WebSocketService {
private static Set<WebSocketService> webSocketServiceSet = new CopyOnWriteArraySet<>();
private Session session;
private static AtomicLong onlineCount = new AtomicLong(0);
@OnOpen
public void onOpen(Session session) {
this.session = session;
onlineCount.incrementAndGet();
webSocketServiceSet.add(this);
System.out.println("有用户上线,当前在线人数有:" + onlineCount.get());
}
@OnClose
public void onClose() {
webSocketServiceSet.remove(this);
onlineCount.decrementAndGet();
System.out.println("有用户下线,当前在线人数有: " + onlineCount.get());
}
@OnMessage
public void onMessage(Session session, String message) {
System.out.println("来自客户端的消息,客户端IP:PORT是 : ");
System.out.println(session.getRequestURI().getHost() + ":" + session.getRequestURI().getPort());
System.out.println("消息是: " + message);
}
@OnError
public void onError(Throwable throwable) {
System.out.println("服务端异常");
throwable.printStackTrace();
}
private void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
@RabbitListener(queues = RabbitMQConfig.PUSH_MSG_QUEUE)
public void ListenMessageFromMQ(String message) {
for (WebSocketService webSocketService : webSocketServiceSet) {
//消息推送了,向每个在线客户端发送消息
webSocketService.sendMessage(message);
}
}
}
对于每个客户端,都会有一个与之对应的WebSocketService实例对象以及Session,就好像Netty里的Channel一样(只是一个比喻,并不等同),所以用一个静态的Set来保存这些实例对象,当需要发送消息的时候,直接取出来,调用Session的发送消息的方法就行了。
好了,就是那么简单,几个注解@OnOpen,@OnMessage,@OnClose,@OnError,听名字应该就知道啥意思了吧,不多说了,相比于Netty的实现,简单的太多了。不过最后还差一点,消息从哪来呢?和之前一样,我这里开启定时任务,定时的往消息队列里塞消息就行了,如下所示:
@Component
public class MyTask {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "*/5 * * * * ?")
public void sendMessageToMQ() {
//这里的消息我就隐编码了,实际上可以有多种方式来构造消息
String message = "Hello, Websocket!!!";
rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_MSG_EXCHANGE,
RabbitMQConfig.PUSH_MSG_ROUTE_KEY,
message);
}
}
最后,别忘了应用主类上加入@EnableScheduling,否则定时任务不会生效。这里的实现效果和之前的实现几乎一样,客户端也不需要修改什么,就不多说了。
6 小结
消息推送系统是一个用途广泛的系统,本文简单介绍了两种实现方法,分别是Netty+WebSocket和Spring Boot+WebSocket,后者其实是基于Servlet实现的,所以性能上和Netty还是有一些差异的。不过无论哪种实现吧,最核心的部分还是WebSocket协议,
有些代码写的不太合理,望谅解。
7 参考资料
《Netty 权威指南》WebSocket相关章节