Redis支持发布订阅模式,先了解一下与发布订阅相关的命令。
发布订阅模式命令
SUBSCRIBE
命令用于订阅channel。当有消息发送到被订阅的channel时,所有订阅了channel的client都会收到消息。当执行SUBSCRIBE
命令后,client会进入发布订阅模式,在此模式中只能执行与发布订阅相关的命令。SUBSCRIBE
命令使用方式:
SUBSCRIBE first second
=> 1) "subscribe"
=> 2) "first"
=> 3) (integer) 1
=> 1) "subscribe"
=> 2) "second"
=> 3) (integer) 2
SUBSCRIBE
命令的返回值是Array,每成功订阅一个channel就会有3个元素。第一个元素是subscribe,代表返回值是针对SUBSCRIBE
命令,第二个元素是订阅成功的channel名称,第三个元素是已订阅的channel数量。
UNSUBSCRIBE
命令用于取消对channel的订阅。
PUBLISH
命令用于发送消息:
PUBLISH first "hello world" => "(integer) 1"
订阅了first channel的客户端会收到消息:
1) "message"
2) "first"
3) "hello world"
收到消息的格式也是Array,第一个元素message代表这是另外一个客户端推送的消息,第二个元素是从哪个channel收到消息,第三个元素是消息的内容。
PSUBSCRIBE
和PUNSUBSCRIBE
命令也是用来订阅/取消订阅channel的,区别是PSUBSCRIBE
和PUNSUBSCRIBE
命令可以通过匹配符批量订阅/取消订阅channel:
PSUBSCRIBE news.*
上面的PSUBSCRIBE
命令会订阅以news.开头的channel,例如news.art.figurative, news.music.jazz等。
Jedis中使用发布订阅模式
Subscriber:
public class Subscriber extends JedisPubSub{
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("subscribe channel : " + channel + ", total channel num : " + subscribedChannels);
}
@Override
public void onMessage(String channel, String message) {
System.out.println("receive message : " + message + " from channel : " + channel);
}
public static void main(String[] args) {
Jedis jedis = null;
try{
jedis = new Jedis("localhost", 6379);
jedis.subscribe(new Subscriber(), "first", "second");
}catch(Exception e){
e.printStackTrace();
}finally {
if(jedis != null){
jedis.close();
}
}
}
}
Publisher:
public class Publisher {
public static void main(String[] args) {
Jedis jedis = null;
try{
jedis = new Jedis("localhost", 6379);
jedis.publish("second", "hello world!");
}catch (Exception e){
e.printStackTrace();
}finally {
if(jedis != null){
jedis.close();
}
}
}
}
Subscriber端打印的日志:
subscribe channel : first, total channel num : 1
subscribe channel : second, total channel num : 2
receive message : hello world! from channel : second
Jedis的Pub/Sub实现
Publisher实现相对简单,只是依照RESP协议向Redis Server发送命令。
Subscriber实现:
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
client.setTimeoutInfinite();
try {
jedisPubSub.proceed(client, channels);
} finally {
client.rollbackTimeout();
}
}
首先设置了client超时时间,setTimeoutInfinite方法使client读取数据时不会超时。finally块中还原了超时时间。jedisPubSub对象就是继承了JedisPubSub类创建的实例,实现的onSubscribe和onMessage等回调方法都在对象中。
public void proceed(Client client, String... channels) {
this.client = client;
client.subscribe(channels);
client.flush();
process(client);
}
subscribe方法会向Redis Server发送订阅命令并flush。在process方法中,会阻塞在getRawObjectMultiBulkReply等待消息:
private void process(Client client) {
do {
//阻塞等待返回结果
List<Object> reply = client.getRawObjectMultiBulkReply();
//获取消息列表中的第一个元素
final Object firstObj = reply.get(0);
final byte[] resp = (byte[]) firstObj;
//比较字节,看是什么命令的返回结果,根据类型回调不同方法
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
onPong(strpattern);
}
} while (isSubscribed());
}
上面省略了一部分代码,只有取消订阅了才会退出循环,退出发布订阅模式。