1. 概述
Redis-Sentinel作为官方推荐的HA解决方案,jedis也在客户端角度实现了对Sentinel的支持,主要实现在JedisSentinelPool这个类里。
2. 简单例子
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisSentinelPool;
import java.util.HashSet;
import java.util.Set;
public class RedisSentinelClient {
/**
* @param args
*/
public static void main(String[] args) {
Set sentinels = new HashSet();
sentinels.add(new HostAndPort("192.168.58.99", 26379).toString());
sentinels.add(new HostAndPort("192.168.58.100", 26380).toString());
sentinels.add(new HostAndPort("192.168.58.101", 26381).toString());
JedisSentinelPool sentinelPool = new JedisSentinelPool("master", sentinels);
System.out.println("Current master: " + sentinelPool.getCurrentHostMaster().toString());
Jedis master = sentinelPool.getResource();
master.set("username","liangzhichao");
sentinelPool.returnResource(master);
Jedis master2 = sentinelPool.getResource();
String value = master2.get("username");
System.out.println("username: " + value);
master2.close();
sentinelPool.destroy();
}
3 JedisSentinelPool字段
//基于apache的commom-pool2的对象池配置
protected GenericObjectPoolConfig poolConfig;
//超时时间,默认是2000
protected int timeout = Protocol.DEFAULT_TIMEOUT;
//sentinel的密码
protected String password;
//redis数据库的数目
protected int database = Protocol.DEFAULT_DATABASE;
//master监听器,当master的地址发生改变时,会触发这些监听者
protected Set<MasterListener> masterListeners = new HashSet<MasterListener>();
protected Logger log = Logger.getLogger(getClass().getName());
//Jedis实例创建工厂
private volatile JedisFactory factory;
//当前的master,HostAndPort是一个简单的包装了ip和port的模型类
private volatile HostAndPort currentHostMaster;
4 JedisSentinelPool构造器
public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) {
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.database = database;
HostAndPort master = initSentinels(sentinels, masterName);
initPool(master);
}
构造器一开始对实例变量进行赋值,参数sentinels是客户端需要打交道的Redis-Sentinel,允许多个,然后通过initSentinels与sentinel沟通后,确定所监视的master节点,接着通过masater节点来创建好对象池,方便后续从对象池中取出一个Jedis实例。
5 initSentinels方法
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
// 有多个sentinels,遍历这些个sentinels
for (String sentinel : sentinels) {
// host:port表示的sentinel地址转化为一个HostAndPort对象。
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
// 连接到sentinel
jedis = new Jedis(hap.getHost(), hap.getPort());
// 根据masterName得到master的地址,返回一个list,host= list[0], port =
// list[1]
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName
+ ". Sentinel: " + hap + ".");
continue;
}
master = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + master);
// 如果在任何一个sentinel中找到了master,不再遍历sentinels
break;
} catch (JedisConnectionException e) {
log.warning("Cannot connect to sentinel running @ " + hap
+ ". Trying next one.");
} finally {
// 关闭与sentinel的连接
if (jedis != null) {
jedis.close();
}
}
}
// 到这里,如果master为null,则说明有两种情况,一种是所有的sentinels节点都down掉了,一种是master节点没有被存活的sentinels监控到
if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException(
"All sentinels down, cannot determine where is " + masterName
+ " master is running...");
}
}
//如果走到这里,说明找到了master的地址
log.info("Redis master running at " + master + ", starting Sentinel listeners...");
//启动对每个sentinels的监听
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masterName, hap.getHost(),
hap.getPort());
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
initSentinels方法的masterName参数就是我们所需要查找的master的名字。一开始,遍历多个sentinels,一个一个连接到sentinel,通过jedis.sentinelGetMasterAddrByName()方法去连接sentinel,询问关于masterName的消息。
public List<String> sentinelGetMasterAddrByName(String masterName) {
client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
final List<Object> reply = client.getObjectMultiBulkReply();
return BuilderFactory.STRING_LIST.build(reply);
}
接着为每个sentinel都启动了一个监听者MasterListener,MaserListener本身是一个线程,它会去订阅sentinel上关于master节点地址该表的消息。
initPool方法
private void initPool(HostAndPort master) {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
if (factory == null) {
factory = new JedisFactory(master.getHost(), master.getPort(), timeout,
password, database);
initPool(poolConfig, factory);
} else {
factory.setHostAndPort(currentHostMaster);
// although we clear the pool, we still have to check the
// returned object
// in getResource, this call only clears idle instances, not
// borrowed instances
internalPool.clear();
}
log.info("Created JedisPool to master at " + master);
}
为啥master需要与实例变量currentHostMaster作比较,这是因为MasterListener会在发现master地址改变以后,去调用initPool方法。
public void initPool(final GenericObjectPoolConfig poolConfig,
PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {
}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
6 MasterListener监听者线程
public void run() {
running.set(true);
while (running.get()) {
j = new Jedis(host, port);
try {
//订阅sentinel上关于master地址改变的消息
j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: "
+ message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
initPool(toHostAndPort(Arrays.asList(
switchMasterMsg[3], switchMasterMsg[4])));
} else {
log.fine("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0]
+ ", our master name is " + masterName);
}
} else {
log.severe("Invalid message received on Sentinel " + host
+ ":" + port + " on channel +switch-master: "
+ message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
log.severe("Lost connection to Sentinel at " + host + ":" + port
+ ". Sleeping 5000ms and retrying.");
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
}
}
}
}
可以看出,MasterListener是委托了Jedis去与sentinel打交道,订阅了关于master地址变换的消息,当master地址变换时,就会再调用一次initPool方法,重新设置对象池相关的设置。
7 JedisSentinelPool不足:
Jedis的JedisSentinelPool的实现仅仅适用于单个master-slave。