首先我们要谈一下redis,他是一款常见且常用的缓存数据库。他有几个显著的特性让开发与DBA所喜爱。
1. Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。即使宕掉,数据也不会丢失。
2. Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。这样就给开发同学提供了更多的选择。
3. Redis支持数据的备份,即master-slave模式的数据备份,数据稳定存在
jedis是redis的一个性能良好的客户端,里面包含了所有的redis的特性,用起来相当爽,本文着重讲述jedis中的高级特性。本文实例是在2.9.0版本上测试通过。自己测试时可以引入下面的内容,下面开始。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
一. Publish/Subscribe(发布订阅)
观察者模式的具体体现。常用来解耦合,有很多版本都实现了此类的功能,比如说Rxjava,EventBus,Zk等。jedis也有自己的实现,首先说一下原理:首先客户端与服务端建立连接,客户端注册感兴趣的事件,同时定义一个当感兴趣的事件过来时,需要如何处理的回调(继承JedisPubSub类,覆写里面的方法,例如onMessage,是消息接受的方法)。客户端或设置与服务端的socket链接超时时间为0(不超时),然后阻塞在RedisInputStream中的read方法,等待消息的到来(也就是说RedisInputStream中有数据)。当有的客户端发布了感兴趣的信息,那么redis 的 server端就会查询哪些client,关注了这个消息,然后,将结果放入该客户端的RedisInputStream中,客户端就接收了感兴趣的消息,然后处理自己的逻辑。下面是测试代码实例:
static class MyListener extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println(channel + "============" + message);
super.onMessage(channel, message);
}
}
@Test
public void testSubscribe() {
jedis.subscribe(new MyListener(), "channel1");
}
@Test
public void testPublish() {
jedis.publish("channel1", "message1");
}
现在跟着代码看一下,关键性的步骤都增加了注释
public void proceed(Client client, String... channels) {
this.client = client;
client.subscribe(channels); //订阅
client.flush(); //数据flush,发送给server端
process(client); //do while 获取数据
}
private void process(Client client) {
do {
List<Object> reply = client.getRawObjectMultiBulkReply(); //阻塞获取server端传递的数据,根据数据,触发相应的回调
final Object firstObj = reply.get(0); //得到的列表中的第一个元素是服务端推送过来信息的事件类型,用来触发客户端定义的回调方法
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels); //订阅上的回调
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels); //取消后的回调
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg); //接收到信息的回调
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
final byte[] bmesg = (byte[]) reply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} while (isSubscribed());
/* Invalidate instance since this thread is no longer listening */
this.client = null;
/*
* Reset pipeline count because subscribe() calls would have increased it but nothing
* decremented it.
*/
client.resetPipelinedCount();
}
private static Object process(final RedisInputStream is) {
final byte b = is.readByte(); //阻塞获取server端传递的数据,这里有必要点一下,因为这是核心,传统的IO流再获取数据的时候会阻塞,直到有数据过来。
if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);
} else if (b == COLON_BYTE) {
return processInteger(is);
} else if (b == MINUS_BYTE) {
processError(is);
return null;
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
}
二. pipeline(管道)
传统的交互方式是request->response request->response request->response,而pipeline是 request->request->request-> response->response->response,将多次请求的报文放到一个请求体中,服务端也是一样,将多次的响应信息合并成一次返回,所以 ,能不快吗?!这也就是pipeline的基本原理。下面是测试代码实例:
@Test
public void test36() {
Pipeline pipelined = jedis.pipelined();
pipelined.set("test1", "22");
pipelined.set("test2", "33");
pipelined.sync(); //无返回值
pipelined.get("test1");
pipelined.get("test2");
List<Object> objects = pipelined.syncAndReturnAll(); //有返回值
for (Object o : objects) {
System.out.print(o + " ");
}
}
result: 22 33
跟着代码看一下:
public Response<String> set(byte[] key, byte[] value) {
getClient(key).set(key, value); //执行set命令
return getResponse(BuilderFactory.STRING); //注意设置响应信息的顺序
}
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
connect(); //与server端形成链接,实例化输入输出流
Protocol.sendCommand(outputStream, cmd, args); //往输出流中按照协议写入key与value,但不发送,调用sync()或syncAndReturnAll()时发送
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
/*
* When client send request which formed by invalid protocol, Redis send back error message
* before close connection. We try to read it to provide reason of failure.
*/
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<T>(builder); //初始化一个Response
pipelinedResponses.add(lr); //放入列表中,保证顺序
return lr;
}
当调用syncAndReturnAll时
public List<Object> syncAndReturnAll() {
if (getPipelinedResponseLength() > 0) { //判断Response列表中的数量
List<Object> unformatted = client.getAll(); //获取所有的结果,阻塞获取
List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get()); //解析每一个得到的信息,因为原来都是byte[],需要解析成String
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} else {
return java.util.Collections.<Object> emptyList();
}
}
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
try {
all.add(readProtocolWithCheckingBroken()); //根据pipelinedCommands值判断all里面需要多少跳数据
} catch (JedisDataException e) {
all.add(e);
}
pipelinedCommands--;
}
return all;
}
readProtocolWithCheckingBroken还是阻塞获取
private static Object process(final RedisInputStream is) {
final byte b = is.readByte(); //阻塞获取值,判断第一个字节的类型,从而进入不同的处理
if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);
} else if (b == COLON_BYTE) {
return processInteger(is);
} else if (b == MINUS_BYTE) {
processError(is);
return null;
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
}
三. Transaction(事务)
1. 事务为提交之前出现异常,那么事务内的操作都不执行。这个很好理解,操作同pipeline,事务提交的时候才会flush数据同服务器交互。
2. 结合watch。事务开始前观察某个值,在事务期间如果其他的线程改了所观察的值,事务不执行!
@Test
public void testTransaction() {
//要在事务执行之前进行监视
jedis.watch("key100", "key101");
Transaction multi = jedis.multi();
multi.set("key100", "key100");
multi.set("key101", "key101");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
multi.exec();
String key6 = jedis.get("key100");
System.out.println(key6);
String key7 = jedis.get("key101");
System.out.println(key7);
}
@Test
public void test() {
jedis.set("key100", "value3");
String key10 = jedis.get("key100");
System.out.println(key10);
}
先执行testTransaction,再执行test,test的结果
value3
10s后testTransaction的结果
value3
null
- redis的事务不支持回滚。因为这种复杂的功能和redis追求的简单高效的设计主旨不符合,并且他认为,redis事务的执行时错误通常都是编程错误造成的,这种错误通常只会出现在开发环境中,而很少会在实际的生产环境中出现,所以他认为没有必要为redis开发事务回滚功能。
总结: jedis这个类库比较轻便,可以帮助我们已更加原汁原味的方式操作redis。首先客户端与服务端的交互,第一个核心点就是协议的约定。服务端通过协议解析客户端传递过来的数据包,客户端根据协议解析服务端的数据。第二个就是网络编程,比较轻便也通俗易懂,方便我们入手网络,这也是我想写的第二个原因。有太多的时候我们接触更多的上层业务,忽略了基础构建,仿佛学习武功,只学习招式,不学习内功,最终顶多算是个杂耍者,永远成不了高手。