集群模式
作为缓存数据库,肯定要考虑缓存服务器稳定性相关的保障机制。
持久化机制就是一种保障方式,持久化机制保证了Redis服务器重启的情况下也不会损失(或少量损失)数据,因为持久化会把内存中的数据保存到硬盘上,重启会从硬盘上加载数据
随着Redis使用场景越来越多,技术发展越来越完善,在Redis整体服务上的容错、扩容、稳定各个方面都需要不断优化,因此在Redis的集群模式上也有不同的搭建方式来应对各种需求。
总结来说,Redis集群模式有三种:
- 主从模式
- 哨兵模式
- Cluster集群模式
1、主从模式
为了Redis服务避免单点故障,通常的做法是将redis的数据复制到多个副本以部署在不同的服务器上,这样即使有一台服务器出现故障,其他服务器依然可以继续提供服务,为此,Redis提供了复制(replication)功能,可以实现当一台数据库的数据更新后,自动将更新的数据同步到其他数据库上。
Redis服务器分为两类:一类是主数据库(Master),另一类是从数据库(Slave);
主数据库可以进行读写操作,当写操作导致数据变化时,会自动将数据同步给从数据库。
从数据库一般是只读的,并接受主数据库同步过来的数据。
一个主数据库可以拥有多个从数据库,而一个从数据库却只能拥有一个主数据库。
优点
1、一个主,可以有多个从,并以非阻塞的方式完成数据同步;
2、从服务器提供读服务,分散主服务的压力,实现读写分离;
3、从服务器之间可以彼此连接和同步请求,减少主服务同步压力;缺点
1、不具备容错和恢复功能,主服务存在单点风险;
2、Redis的主从复制采用全量复制,需要服务器有足够的空余内存;
3、主从模式较难支持在线扩容;
2、哨兵模式——Sentinel 集群
Redis提供的sentinel(哨兵)机制,通过sentinel模式启动redis后,自动监控Master/Slave的运行状态,基本原理是:心跳机制+投票裁决。
简单来说,哨兵的作用就是监控redis系统的运行状况,它的功能包括以下两个:
- 1、监控主数据库和从数据库是否正常运行
- 2、主数据库出现故障时,自动将从数据库转换为主数据库
哨兵模式主要以下几个内容:
- 监控(Monitoring):Sentinel会定期检查主从服务器是否处于正常工作状态
- 提醒(Notification):当被监控的某个Redis服务器出现异常时,Sentinel可以通过API向管理员或者其他应用程序发送通知
- 自动故障迁移(Antomatic failover):当一个主服务器不能正常工作时,Sentinel会开始一次自动故障迁移操作,它会将失效主服务器的其中一个从服务器升级为新的主服务器,并让失效主服务器的其他从服务器改为复制新的主服务器;当客户端试图连接失效的主服务器时,集群也会向客户端返回新主服务器的地址,使得集群可以使用新主服务器代替失效服务器
Redis Sentinel 是一个分布式系统,你可以在一个架构中运行多个Sentinel进程(progress)
优点
1、哨兵模式主从可以切换,具备基本的故障转移能力;
2、哨兵模式具备主从模式的所有优点缺点
1、哨兵模式也很难支持在线扩容操作
2、集群的配置信息管理比较复杂
3、集群模式
3.1 Redis Cluster
Redis Cluster是一种服务器Sharding技术,采用CRC16算法来实现数据的分片,3.0版本开始正式提供,采用无中心架构,每个节点保存数据和整个集群状态,每个节点都和其他所有节点连接。
Cluster集群结构特点:
1、Redis Cluster所有物理节点都映射到[0-16383]slot上(不一定均匀分布),Cluster负责维护节点、桶(slot)、值之间的关系;
2、在Redis集群中放置一个key-value时,根据CRC16(16) mod 16384的值,从之前划分的16384个桶中选择一个;
3、所有的Redis节点彼此互联(PING_PONG机制),内部使用二进制协议优化传输效率;
4、超过半数的节点检测到某个几点失效时,则判定该节点失效;
5、使用端与Redis节点连接,不需要中间proxy层,直接可以操作,使用端不需要连接集群所有节点,连接集群中任意一个可用节点即可。
优点
1、无中心架构,节点间数据共享,可动态调整数据分布;
2、节点可动态添加删除,扩张性比较灵活;
3、部分节点异常,不影响整体集群的可用性;缺点
1、集群实现比较复杂;
2、批量操作指令(mget、mset等)支持有限;
3、事务操作支持有限
Jedis客户端实现:JedisCluster
3.2 Redis Sharding
Redis Sharding 属于客户端sharding分片技术,采用一致性Hash算法来实现数据的分片,3.0版本以前基本上使用分片实现集群。
Redis Sharding特点:
- 各个Redis节点独立,之间无关系
- 某个Redis节点挂了,整个集群不可用,所以需要对每个节点做主从备份
- 主从备份方案一般通过读写分离设置,每个master至少两个slaver,只有这样master挂掉后,才能选举其中一个Slaver成为新的master,原来master节点加入集群后成为新master的slaver节点
- redis主从切换对客户端jedis使用时透明的,即redis发生了主从切换并不影响jedis的使用
缺点:
节点扩展和收缩不友好
Jedis客户端实现:ShardedJedis
4、哨兵Sentinel Sharding集群模式
如果既想要哨兵模式提供的自动监控和故障转移机制,又想要Sharding集群的分片机制,那么该怎么办呢?
在服务端,以Sharding集群启动,同时,使得Redis Sentinel分布式系统监听多个Master节点;
在客户端,自定义一个类,继承redis.clients.util.Pool,实现redis线程池;
以jedis为例,自定义线程池实现如下,参考jedis源码redis.clients.jedis.JedisSentinelPool,redis.clients.jedis.ShardedJedisPool
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* Jedis不能同时支持Shareded和Sentinel。
*
* 这里是把单master改成多master,同时把Jedis改成ShardedJedis。
* 支持多主机集群
*/
public class ShardedJedisSentinelPoolExt extends Pool<ShardedJedis> {
public static final int MAX_RETRY_SENTINEL = 10;
private static final Logger logger = LoggerFactory.getLogger(LoggerType.COMMON);
protected GenericObjectPoolConfig poolConfig;
protected int timeout = Protocol.DEFAULT_TIMEOUT;
private int sentinelRetry = 0;
protected String password;
protected int database = Protocol.DEFAULT_DATABASE;
protected Set<MasterListener> masterListeners = new HashSet<>();
private volatile List<HostAndPort> currentHostMasters;
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, new GenericObjectPoolConfig(),
Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, String password) {
this(masters, sentinels, new GenericObjectPoolConfig(),
Protocol.DEFAULT_TIMEOUT, password);
}
public ShardedJedisSentinelPoolExt(final GenericObjectPoolConfig poolConfig, Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout,
final String password) {
this(masters, sentinels, poolConfig, timeout, password,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int timeout) {
this(masters, sentinels, poolConfig, timeout, null,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final String password) {
this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT,
password);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout,
final String password, final int database) {
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.database = database;
List<String> convertList = new ArrayList<>(masters);
List<HostAndPort> masterList = initSentinels(sentinels, convertList);
initPool(masterList);
}
@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}
super.destroy();
}
public List<HostAndPort> getCurrentHostMaster() {
return currentHostMasters;
}
private void initPool(List<HostAndPort> masters) {
if (!equalsObj(currentHostMasters, masters)) {
StringBuilder sb = new StringBuilder();
for (HostAndPort master : masters) {
sb.append(master.toString());
sb.append(" ");
}
logger.info("Created ShardedJedisPool to master at [" + sb.toString() + "]");
List<JedisShardInfo> shardMasters = makeShardInfoList(masters);
initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
currentHostMasters = masters;
}
}
private static boolean equalsObj(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters) {
if (currentShardMasters != null && shardMasters != null && checkListSize(currentShardMasters,shardMasters)) {
for (int i = 0; i < currentShardMasters.size(); i++) {
if (!currentShardMasters.get(i).equals(shardMasters.get(i)))
return false;
}
return true;
}
return false;
}
private static boolean checkListSize(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters){
return (currentShardMasters.size() == shardMasters.size())? true : false;
}
private List<JedisShardInfo> makeShardInfoList(List<HostAndPort> masters) {
List<JedisShardInfo> shardMasters = new ArrayList<>();
for (HostAndPort master : masters) {
JedisShardInfo jedisShardInfo = new JedisShardInfo(master.getHost(), master.getPort(), timeout);
jedisShardInfo.setPassword(password);
shardMasters.add(jedisShardInfo);
}
return shardMasters;
}
private List<HostAndPort> initSentinels(Set<String> sentinels, final List<String> masters) {
Map<String, HostAndPort> masterMap = new HashMap<>();
List<HostAndPort> shardMasters = new ArrayList<>();
logger.info("Trying to find all master from available Sentinels...");
for (String masterName : masters) {
HostAndPort master = null;
boolean fetched = false;
while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
logger.info("Connecting to Sentinel " + hap);
try( Jedis jedis = new Jedis(hap.getHost(), hap.getPort())) {
master = masterMap.get(masterName);
if (master == null) {
List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);
if (hostAndPort != null && ! hostAndPort.isEmpty()) {
master = toHostAndPort(hostAndPort);
logger.info("Found Redis master at " + master);
shardMasters.add(master);
masterMap.put(masterName, master);
fetched = true;
jedis.disconnect();
break;
}
}
} catch (JedisConnectionException e) {
logger.error("Cannot connect to sentinel running @ " + hap + ". Trying next one.",e);
}
}
if (null == master) {
try {
logger.info("All sentinels down, cannot determine where is "
+ masterName + " master is running... sleeping 1000ms, Will try again.");
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage());
Thread.currentThread().interrupt();
}
fetched = false;
sentinelRetry++;
}
}
// Try MAX_RETRY_SENTINEL times.
if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {
logger.info("All sentinels down and try " + MAX_RETRY_SENTINEL + " times, Abort.");
throw new JedisConnectionException("Cannot connect all sentinels, Abort.");
}
}
// All shards master must been accessed.
if (! masters.isEmpty() && masters.size() == shardMasters.size()) {
logger.info("Starting Sentinel listeners...");
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masters, hap.getHost(), hap.getPort());
masterListeners.add(masterListener);
masterListener.start();
}
}
return shardMasters;
}
private static HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = getMasterAddrByNameResult.get(0);
int port = Integer.parseInt(getMasterAddrByNameResult.get(1));
return new HostAndPort(host, port);
}
/**
* PoolableObjectFactory custom impl.
*/
protected static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;
public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
@Override
public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return new DefaultPooledObject<>(jedis);
}
@Override
public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
for (Jedis jedis : shardedJedis.getAllShards()) {
try {
jedis.quit();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
try {
jedis.disconnect();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
@Override
public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
try {
ShardedJedis jedis = pooledShardedJedis.getObject();
for (Jedis shard : jedis.getAllShards()) {
if (!"PONG".equals(shard.ping())) {
return false;
}
}
return true;
} catch (Exception ex) {
logger.error(ex.getMessage(),ex);
return false;
}
}
@Override
public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
// Do nothing because of X and Y.
}
@Override
public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {
// Do nothing because of X and Y.
}
}
protected class JedisPubSubAdapter extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
// Do nothing because of X and Y.
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// Do nothing because of X and Y.
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// Do nothing because of X and Y.
}
}
protected class MasterListener extends Thread {
protected List<String> masters;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected Jedis jedis;
protected AtomicBoolean running = new AtomicBoolean(false);
protected MasterListener() {
}
public MasterListener(List<String> masters, String host, int port) {
this.masters = masters;
this.host = host;
this.port = port;
}
public MasterListener(List<String> masters, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masters, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
@Override
public void run() {
running.set(true);
while (running.get()) {
jedis = new Jedis(host, port);
try {
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
logger.info("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
int index = masters.indexOf(switchMasterMsg[0]);
if (index >= 0) {
HostAndPort newHostMaster = toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
List<HostAndPort> newHostMasters = new ArrayList<>();
for (int i = 0; i < masters.size(); i++) {
newHostMasters.add(null);
}
Collections.copy(newHostMasters, currentHostMasters);
newHostMasters.set(index, newHostMaster);
initPool(newHostMasters);
} else {
StringBuilder sb = new StringBuilder();
for (String masterName : masters) {
sb.append(masterName);
sb.append(",");
}
logger.info("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0]
+ ", our monitor master name are ["
+ sb + "]");
}
} else {
logger.info("Invalid message received on Sentinel "
+ host
+ ":"
+ port
+ " on channel +switch-master: "
+ message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
logger.info("Lost connection to Sentinel at " + host
+ ":" + port
+ ". Sleeping 5000ms and retrying.");
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
logger.error(e.getMessage(),e1);
Thread.currentThread().interrupt();
}
} else {
logger.info("Unsubscribing from Sentinel at " + host + ":"
+ port);
}
}
}
}
public void shutdown() {
try {
logger.info("Shutting down listener on " + host + ":" + port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
jedis.disconnect();
} catch (Exception e) {
logger.error("Caught exception while shutting down: " , e);
}
}
}
}