JedisCluster 操作管道(基于jedis 2.9+的版本)

前言

现在很多的博客论坛,很多都是以前写的代码。殊不知,这代码不是一层不变的。特别是涉及到源码的改变。这就导致很多网上的文章几乎都是 copy 来 copy 去的。这里也只是建议大家的有看源码的习惯。不然,照抄网上的博客有时候真的不能解决问题。还得动动脑子。本人也是踩坑过来的 。好了。回到重点,

为什么  JedisCluster 不支持直接操作管道(Pipeline)?  (如果面试这么问。你怎么回答?百思不得其姐(解)  欢迎留言^_^)

首先我们看下 JedisCluster  源码。

public class JedisCluster extends BinaryJedisCluster implements JedisCommands,

    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

/*****************一堆方法*****************/

  }

看到这个 JedisCluster 类 继承 BinaryJedisCluster。 好了我们在下一步看   BinaryJedisCluster 类里面到底是什么?

public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,

    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {

  public static final short HASHSLOTS = 16384;

  protected static final int DEFAULT_TIMEOUT = 2000;

  protected static final int DEFAULT_MAX_REDIRECTIONS = 5;

  protected int maxAttempts;

  protected JedisClusterConnectionHandler connectionHandler;

/****************************一堆方法*****************************/

}

可以看到在 BinaryJedisCluster  继承一些接口。所以我我们先看下这个类下除了构造方法还剩下什么东东?

埃!!! JedisClusterConnectionHandler connectionHandler; 这个类里面会不会有我们想要的东西呢?进去看下

package redis.clients.jedis;

import java.io.Closeable;

import java.util.Map;

import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.exceptions.JedisConnectionException;

public abstract class JedisClusterConnectionHandler implements Closeable {

    protected final JedisClusterInfoCache cache;

  public JedisClusterConnectionHandler(Set<HostAndPort> nodes,

                                      final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {

    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);

    initializeSlotsCache(nodes, poolConfig, password);

  }

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

  public Jedis getConnectionFromNode(HostAndPort node) {

    return cache.setupNodeIfNotExist(node).getResource();

  }


  public Map<String, JedisPool> getNodes() {

    return cache.getNodes();

  }

  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {

    for (HostAndPort hostAndPort : startNodes) {

      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());

      if (password != null) {

        jedis.auth(password);

      }

      try {

        cache.discoverClusterNodesAndSlots(jedis);

        break;

      } catch (JedisConnectionException e) {

        // try next nodes

      } finally {

        if (jedis != null) {

          jedis.close();

        }

      }

    }

  }

  public void renewSlotCache() {

    cache.renewClusterSlots(null);

  }

  public void renewSlotCache(Jedis jedis) {

    cache.renewClusterSlots(jedis);

  }

  @Override

  public void close() {

    cache.reset();

  }

}


这是个抽象类。里面有2个抽象方法。在2.9以前版本 

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

这2方法可有所实现。(没去看2.9以前的版本源码)

网上很多以前博客的都是使用  getConnectionFromSlot(int slot); 来获取某个 jedis  来操作  pipeline。

所以在此我们还能怎么办呢?

这会儿。我们看到

JedisClusterInfoCache cache;       这里面会不会有有我们想要的?

public class JedisClusterInfoCache {

  private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();

  private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  private final Lock r = rwl.readLock();

  private final Lock w = rwl.writeLock();

  private volatile boolean rediscovering;

  private final GenericObjectPoolConfig poolConfig;

  private int connectionTimeout;

  private int soTimeout;

  private String password;

  private static final int MASTER_NODE_INDEX = 2;

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {

    this(poolConfig, timeout, timeout, null);

  }

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,

      final int connectionTimeout, final int soTimeout, final String password) {

    this.poolConfig = poolConfig;

    this.connectionTimeout = connectionTimeout;

    this.soTimeout = soTimeout;

    this.password = password;

  }

/******方法******以下方法是我重点标注的*************/

public JedisPool getNode(String nodeKey) {

    r.lock();

    try {

      return nodes.get(nodeKey);

    } finally {

      r.unlock();

    }

  }

  public JedisPool getSlotPool(int slot) {

    r.lock();

    try {

      return slots.get(slot);

    } finally {

      r.unlock();

    }

  }

  public Map<String, JedisPool> getNodes() {

    r.lock();

    try {

      return new HashMap<String, JedisPool>(nodes);

    } finally {

      r.unlock();

    }

  }

  public List<JedisPool> getShuffledNodesPool() {

    r.lock();

    try {

      List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());

      Collections.shuffle(pools);

      return pools;

    } finally {

      r.unlock();

    }

  }

埃~看到了可以获取到某个jedis  

其实这个 JedisClusterInfoCache 类 是你在初始化jedisCluster时 将所有的节点放入缓存。

因此,这个类的方法能给我们返回相关的jedis实例

我们要这么做呢?

接下来是我的代码。通过java的反射机制直接获取。

public static void main(String[] args) throws NoSuchFieldException {

        JedisPoolConfig config = new JedisPoolConfig();

        Set<HostAndPort> nodeList = new HashSet<>();

        nodeList.add(new HostAndPort("192.168.41.65", 6379));

        nodeList.add(new HostAndPort("192.168.41.70", 6379));

        nodeList.add(new HostAndPort("192.168.41.42", 6379));

        nodeList.add(new HostAndPort("192.168.41.20", 6380));

        nodeList.add(new HostAndPort("192.168.41.30", 6380));

        nodeList.add(new HostAndPort("192.168.41.40", 6380));

        JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);

        jedisCluster.set("James", "Bond");

        //通過 java.lang.reflect.Field 反射

        Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");


        //通過spring 工具類  ReflectionUtils 反射

        Jedis j = getJedisBySlot(jedisCluster, 0, "James");

// 接下来就是pipeline操作了

if(jedis != null) {

Pipeline pipeline = jedis.pipelined();

pipeline.syncAndReturnAll();

// jedis会自动将资源归还到连接池

jedis.close();

}else {

System.err.println("找不到 jedis");

}

    }

/**

* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

// 获取key对应的slot 获取槽号(0~16383)

slot = JedisClusterCRC16.getSlot(key);

}

Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache =  JedisClusterConnectionHandler.class.getDeclaredField("cache");

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

/**

* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

slot = JedisClusterCRC16.getSlot(key);

}

//org.springframework.util.ReflectionUtils 工具類  BinaryJedisCluster 下的  JedisClusterConnectionHandler

Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

但是大家以为以上的代码就很完美了么?

确实。在去找缓存里的jedis时,可能某个节点挂了,然后刚好程序拿到这个实例,

这时候这里就会出现错误。因此我们 应该在原来的基础上,去刷新一遍集群。

代码由你们来给吧。我不会写了。哈哈哈·


最后 如果针对  JedisClusterInfoCache 源码分析的 请看   https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容