



public interface IFailureDetector
     * Failure Detector's knowledge of whether a node is up or
     * down.
     * @param ep endpoint in question.
     * @return true if UP and false if DOWN.
    public boolean isAlive(InetAddress ep);

     * This method is invoked by any entity wanting to interrogate the status of an endpoint.
     * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and
     * deem an endpoint as suspicious or alive as explained in the Hayashibara paper.
     * param ep endpoint for which we interpret the inter arrival times.
    public void interpret(InetAddress ep);

     * This method is invoked by the receiver of the heartbeat. In our case it would be
     * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The
     * FailureDetector will then sample the arrival time as explained in the paper.
     * param ep endpoint being reported.
    public void report(InetAddress ep);

     * remove endpoint from failure detector
    public void remove(InetAddress ep);

     * force conviction of endpoint in the failure detector
    public void forceConviction(InetAddress ep);

     * Register interest for Failure Detector events.
     * @param listener implementation of an application provided IFailureDetectionEventListener
    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener);

     * Un-register interest for Failure Detector events.
     * @param listener implementation of an application provided IFailureDetectionEventListener
    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);
  • IFailureDetector接口定义了isAlive、interpret、report、forceConviction、registerFailureDetectionEventListener、unregisterFailureDetectionEventListener方法



 * This FailureDetector is an implementation of the paper titled
 * "The Phi Accrual Failure Detector" by Hayashibara.
 * Check the paper and the <i>IFailureDetector</i> interface for details.
public class FailureDetector implements IFailureDetector, FailureDetectorMBean
    private static final Logger logger = LoggerFactory.getLogger(FailureDetector.class);
    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=FailureDetector";
    private static final int SAMPLE_SIZE = 1000;
    protected static final long INITIAL_VALUE_NANOS = TimeUnit.NANOSECONDS.convert(getInitialValue(), TimeUnit.MILLISECONDS);
    private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message
    private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds
    private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause();
    private long lastInterpret = Clock.instance.nanoTime();
    private long lastPause = 0L;

    private static long getMaxLocalPause()
        if (System.getProperty("cassandra.max_local_pause_in_ms") != null)
            long pause = Long.parseLong(System.getProperty("cassandra.max_local_pause_in_ms"));
            logger.warn("Overriding max local pause time to {}ms", pause);
            return pause * 1000000L;
            return DEFAULT_MAX_PAUSE;

    public static final IFailureDetector instance = new FailureDetector();

    // this is useless except to provide backwards compatibility in phi_convict_threshold,
    // because everyone seems pretty accustomed to the default of 8, and users who have
    // already tuned their phi_convict_threshold for their own environments won't need to
    // change.
    private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434...

    private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples = new ConcurrentHashMap<>();
    private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<>();


    public boolean isAlive(InetAddress ep)
        if (ep.equals(FBUtilities.getBroadcastAddress()))
            return true;

        EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
        // we could assert not-null, but having isAlive fail screws a node over so badly that
        // it's worth being defensive here so minor bugs don't cause disproportionate
        // badness.  (See CASSANDRA-1463 for an example).
        if (epState == null)
            logger.error("Unknown endpoint: " + ep, new IllegalArgumentException(""));
        return epState != null && epState.isAlive();

    public void interpret(InetAddress ep)
        ArrivalWindow hbWnd = arrivalSamples.get(ep);
        if (hbWnd == null)
        long now = Clock.instance.nanoTime();
        long diff = now - lastInterpret;
        lastInterpret = now;
        if (diff > MAX_LOCAL_PAUSE_IN_NANOS)
            logger.warn("Not marking nodes down due to local pause of {} > {}", diff, MAX_LOCAL_PAUSE_IN_NANOS);
            lastPause = now;
        if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
            logger.debug("Still not marking nodes down due to local pause");
        double phi = hbWnd.phi(now);
        if (logger.isTraceEnabled())
            logger.trace("PHI for {} : {}", ep, phi);

        if (PHI_FACTOR * phi > getPhiConvictThreshold())
            if (logger.isTraceEnabled())
                logger.trace("Node {} phi {} > {}; intervals: {} mean: {}", new Object[]{ep, PHI_FACTOR * phi, getPhiConvictThreshold(), hbWnd, hbWnd.mean()});
            for (IFailureDetectionEventListener listener : fdEvntListeners)
                listener.convict(ep, phi);
        else if (logger.isDebugEnabled() && (PHI_FACTOR * phi * DEBUG_PERCENTAGE / 100.0 > getPhiConvictThreshold()))
            logger.debug("PHI for {} : {}", ep, phi);
        else if (logger.isTraceEnabled())
            logger.trace("PHI for {} : {}", ep, phi);
            logger.trace("mean for {} : {}", ep, hbWnd.mean());

  • FailureDetector实现了IFailureDetector, FailureDetectorMBean接口
  • 这里定义的PHI_FACTOR为1.0 / Math.log(10.0),而phiConvictThreshold默认为8;这里维护了arrivalSamples,即InetAddress及其ArrivalWindow的映射
  • 其isAlive方法取的epState.isAlive()的值;其interpret方法调用ArrivalWindow.phi计算now值的phi,然后乘以PHI_FACTOR,如果大于phiConvictThreshold则会回调IFailureDetectionEventListener的convict方法



public class EndpointState
    protected static final Logger logger = LoggerFactory.getLogger(EndpointState.class);

    public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();

    private volatile HeartBeatState hbState;
    private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;

    /* fields below do not get serialized */
    private volatile long updateTimestamp;
    private volatile boolean isAlive;

    public boolean isAlive()
        return isAlive;

    void markAlive()
        isAlive = true;

    void markDead()
        isAlive = false;

  • EndpointState的isAlive返回的是isAlive值,则markDead方法则会标记该值为false



class ArrivalWindow
    private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class);
    private long tLast = 0L;
    private final ArrayBackedBoundedStats arrivalIntervals;
    private double lastReportedPhi = Double.MIN_VALUE;

    // in the event of a long partition, never record an interval longer than the rpc timeout,
    // since if a host is regularly experiencing connectivity problems lasting this long we'd
    // rather mark it down quickly instead of adapting
    // this value defaults to the same initial value the FD is seeded with
    private final long MAX_INTERVAL_IN_NANO = getMaxInterval();

    ArrivalWindow(int size)
        arrivalIntervals = new ArrayBackedBoundedStats(size);

    private static long getMaxInterval()
        String newvalue = System.getProperty("cassandra.fd_max_interval_ms");
        if (newvalue == null)
            return FailureDetector.INITIAL_VALUE_NANOS;
            logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue);
            return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS);

    synchronized void add(long value, InetAddress ep)
        assert tLast >= 0;
        if (tLast > 0L)
            long interArrivalTime = (value - tLast);
            if (interArrivalTime <= MAX_INTERVAL_IN_NANO)
                logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep);
                logger.trace("Ignoring interval time of {} for {}", interArrivalTime, ep);
            // We use a very large initial interval since the "right" average depends on the cluster size
            // and it's better to err high (false negatives, which will be corrected by waiting a bit longer)
            // than low (false positives, which cause "flapping").
        tLast = value;

    double mean()
        return arrivalIntervals.mean();

    // see CASSANDRA-2597 for an explanation of the math at work here.
    double phi(long tnow)
        assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive
        long t = tnow - tLast;
        lastReportedPhi = t / mean();
        return lastReportedPhi;

    double getLastReportedPhi()
        return lastReportedPhi;

    public String toString()
        return Arrays.toString(arrivalIntervals.getArrivalIntervals());
  • ArrivalWindow使用ArrayBackedBoundedStats来存储arrivalIntervals值
  • 其add方法是一个synchronized方法,它在tLast大于0且interArrivalTime小于等于MAX_INTERVAL_IN_NANO的时候才会执行arrivalIntervals.add(interArrivalTime),如果tLast小于等于0则执行arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS)
  • phi值采用了exponential distribution appropriate,即通过t / mean()来近似计算P(x <= t)

Although the original paper suggests that the distribution is approximated by the Gaussian distribution we found the Exponential Distribution to be a better approximation, because of the nature of the gossip channel and its impact on latency
Regular message transmissions experiencing typical random jitter will follow a normal distribution, but since gossip messages from endpoint A to endpoint B are sent at random intervals, they likely make up a Poisson process, making the exponential distribution appropriate.



public class Gossiper implements IFailureDetectionEventListener, GossiperMBean

     * This method is part of IFailureDetectionEventListener interface. This is invoked
     * by the Failure Detector when it convicts an end point.
     * @param endpoint end point that is convicted.
    public void convict(InetAddress endpoint, double phi)
        EndpointState epState = endpointStateMap.get(endpoint);
        if (epState == null)

        if (!epState.isAlive())

        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());

        if (isShutdown(endpoint))
            markDead(endpoint, epState);

     * This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
     * @param endpoint endpoint that has shut itself down
    protected void markAsShutdown(InetAddress endpoint)
        EndpointState epState = endpointStateMap.get(endpoint);
        if (epState == null)
        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
        epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
        markDead(endpoint, epState);

    public void markDead(InetAddress addr, EndpointState localState)
        if (logger.isTraceEnabled())
            logger.trace("marking as down {}", addr);
        unreachableEndpoints.put(addr, System.nanoTime());
        logger.info("InetAddress {} is now DOWN", addr);
        for (IEndpointStateChangeSubscriber subscriber : subscribers)
            subscriber.onDead(addr, localState);
        if (logger.isTraceEnabled())
            logger.trace("Notified {}", subscribers);

  • Gossiper实现了IFailureDetectionEventListener接口,其convict方法会获取endpointState,如果已经shutdown则执行markAsShutdown方法,否则执行markDead方法
  • markAsShutdown方法会调用markDead方法,然后再调用FailureDetector.instance.forceConviction(endpoint)方法
  • markDead方法则直接调用endpointState.markDead()方法,然后回调IEndpointStateChangeSubscriber的onDead方法



public class Gossiper implements IFailureDetectionEventListener, GossiperMBean

    public void start(int generationNumber)
        start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));

     * Start the gossiper with the generation number, preloading the map of application states before starting
    public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
        /* initialize the heartbeat state for this localEndpoint */
        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());

        //notify snitches that Gossiper is about to start
        if (logger.isTraceEnabled())
            logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());

        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),

    private class GossipTask implements Runnable
        public void run()
                //wait on messaging service to start listening


                /* Update the local heartbeat counter. */
                if (logger.isTraceEnabled())
                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();

                if (gDigests.size() > 0)
                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                    /* Gossip to some random live member */
                    boolean gossipedToSeed = doGossipToLiveMember(message);

                    /* Gossip to some unreachable member with some probability to check if he is back up */

                    /* Gossip to a seed if we did not do so above, or we have seen less nodes
                       than there are seeds.  This prevents partitions where each group of nodes
                       is only gossiping to a subset of the seeds.

                       The most straightforward check would be to check that all the seeds have been
                       verified either as live or unreachable.  To avoid that computation each round,
                       we reason that:

                       either all the live nodes are seeds, in which case non-seeds that come online
                       will introduce themselves to a member of the ring by definition,

                       or there is at least one non-seed node in the list, in which case eventually
                       someone will gossip to it, and then do a gossip to a random seed from the
                       gossipedToSeed check.

                       See CASSANDRA-150 for more exposition. */
                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())

            catch (Exception e)
                logger.error("Gossip error", e);

    private void doStatusCheck()
        if (logger.isTraceEnabled())
            logger.trace("Performing status check ...");

        long now = System.currentTimeMillis();
        long nowNano = System.nanoTime();

        long pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).metrics.pendingTasks.getValue();
        if (pending > 0 && lastProcessedMessageAt < now - 1000)
            // if some new messages just arrived, give the executor some time to work on them
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

            // still behind?  something's broke
            if (lastProcessedMessageAt < now - 1000)
                logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending);

        Set<InetAddress> eps = endpointStateMap.keySet();
        for (InetAddress endpoint : eps)
            if (endpoint.equals(FBUtilities.getBroadcastAddress()))

            EndpointState epState = endpointStateMap.get(endpoint);
            if (epState != null)
                // check if this is a fat client. fat clients are removed automatically from
                // gossip after FatClientTimeout.  Do not remove dead states here.
                if (isGossipOnlyMember(endpoint)
                    && !justRemovedEndpoints.containsKey(endpoint)
                    && TimeUnit.NANOSECONDS.toMillis(nowNano - epState.getUpdateTimestamp()) > fatClientTimeout)
                    logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fatClientTimeout);
                    removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
                    evictFromMembership(endpoint); // can get rid of the state immediately

                // check for dead state removal
                long expireTime = getExpireTimeForEndpoint(endpoint);
                if (!epState.isAlive() && (now > expireTime)
                    && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
                    if (logger.isDebugEnabled())
                        logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);

        if (!justRemovedEndpoints.isEmpty())
            for (Entry<InetAddress, Long> entry : justRemovedEndpoints.entrySet())
                if ((now - entry.getValue()) > QUARANTINE_DELAY)
                    if (logger.isDebugEnabled())
                        logger.debug("{} elapsed, {} gossip quarantine over", QUARANTINE_DELAY, entry.getKey());

  • Gossiper定义了start方法,该方法通过executor.scheduleWithFixedDelay创建了GossipTask的调度任务
  • GossipTask的run方法会执行doGossipToLiveMember、maybeGossipToUnreachableMember,最后执行doStatusCheck方法
  • doStatusCheck方法会遍历endpointStateMap中的InetAddress,对其执行FailureDetector.instance.interpret(endpoint)


  • IFailureDetector接口定义了isAlive、interpret、report、forceConviction、registerFailureDetectionEventListener、unregisterFailureDetectionEventListener方法
  • FailureDetector实现了IFailureDetector, FailureDetectorMBean接口;其isAlive方法取的epState.isAlive()的值,EndpointState的isAlive返回的是isAlive值,则markDead方法则会标记该值为false;其interpret方法调用ArrivalWindow.phi计算now值的phi,然后乘以PHI_FACTOR,如果大于phiConvictThreshold则会回调IFailureDetectionEventListener的convict方法
  • ArrivalWindow使用ArrayBackedBoundedStats来存储arrivalIntervals值;其add方法是一个synchronized方法,它在tLast大于0且interArrivalTime小于等于MAX_INTERVAL_IN_NANO的时候才会执行arrivalIntervals.add(interArrivalTime),如果tLast小于等于0则执行arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS);phi值采用了exponential distribution appropriate,即通过t / mean()来近似计算P(x <= t)
  • Gossiper实现了IFailureDetectionEventListener接口,其convict方法会获取endpointState,如果已经shutdown则执行markAsShutdown方法,否则执行markDead方法;markAsShutdown方法会调用markDead方法,然后再调用FailureDetector.instance.forceConviction(endpoint)方法;markDead方法则直接调用endpointState.markDead()方法,然后回调IEndpointStateChangeSubscriber的onDead方法
  • Gossiper定义了start方法,该方法通过executor.scheduleWithFixedDelay创建了GossipTask的调度任务;GossipTask的run方法会执行doGossipToLiveMember、maybeGossipToUnreachableMember,最后执行doStatusCheck方法;doStatusCheck方法会遍历endpointStateMap中的InetAddress,对其执行FailureDetector.instance.interpret(endpoint)


  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354
