聊聊storm的tickTuple

本文主要研究一下storm的tickTuple

实例

TickWordCountBolt

public class TickWordCountBolt extends BaseBasicBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class);

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        if(TupleUtils.isTick(input)){
            //execute tick logic
            LOGGER.info("execute tick tuple, emit and clear counts");
            counts.entrySet().stream()
                    .forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue())));
            counts.clear();
        }else{
            String word = input.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • 使用tick的话,在execute方法里头要自己判断tuple类型,然后执行相应处理
  • 这里实例是重写getComponentConfiguration方法,直接new了一个conf,设置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数

tickTopology

    @Test
    public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        //并发度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new TickWordCountBolt(), 5)
//              .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");
        SubmitHelper.submitRemote("tickDemo",builder);
    }
  • 除了重写getComponentConfiguration方法配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数外,还可以在TopologyBuilder.setBolt之后调用addConfiguration方法在配置,这个配置会覆盖getComponentConfiguration方法的配置
  • 另外除了在bolt上配置,还可以在StormSubmitter.submitTopology时,对传入的conf配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数,不过这个配置是全局的,作用于整个topology的所有bolt;当出现既有全局配置,又有bolt自己的配置时,作用范围小的优先。

源码解析

TupleUtils.isTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static boolean isTick(Tuple tuple) {
        return tuple != null
               && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
               && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
    }
  • isTick是根据tuple的sourceComponent以及sourceStreamId来判断

TopologyBuilder.setBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

    /**
     * Define a new bolt in this topology with the specified amount of parallelism.
     *
     * @param id               the id of this component. This id is referenced by other components that want to consume this bolt's
     *                         outputs.
     * @param bolt             the bolt
     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
     *                         somewhere around the cluster.
     * @return use the returned object to declare the inputs to this component
     *
     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
     */
    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, bolt, parallelism_hint);
        _bolts.put(id, bolt);
        return new BoltGetter(id);
    }

    private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if (parallelism != null) {
            int dop = parallelism.intValue();
            if (dop < 1) {
                throw new IllegalArgumentException("Parallelism must be positive.");
            }
            common.set_parallelism_hint(dop);
        }
        Map<String, Object> conf = component.getComponentConfiguration();
        if (conf != null) {
            common.set_json_conf(JSONValue.toJSONString(conf));
        }
        commons.put(id, common);
    }
  • setBolt的时候调用了initCommon,这里调用了bolt的getComponentConfiguration,将其配置写入到commons

BoltGetter.addConfiguration

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

    protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
        //......
    }
  • addConfiguration方法继承自BaseConfigurationDeclarer

BaseConfigurationDeclarer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java

public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> {
    @Override
    public T addConfiguration(String config, Object value) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(config, value);
        return addConfigurations(configMap);
    }
    //......
}
  • 这里新建一个map,然后调用子类的addConfigurations,这里子类为ConfigGetter

ConfigGetter.addConfigurations

    protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
        String id;

        public ConfigGetter(String id) {
            this.id = id;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T addConfigurations(Map<String, Object> conf) {
            if (conf != null) {
                if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
                    throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
                }
                if (!conf.isEmpty()) {
                    String currConf = commons.get(id).get_json_conf();
                    commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                }
            }
            return (T) this;
        }
        //......
    }

    private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
        Map<String, Object> res = new HashMap<>(into);
        res.putAll(newMap);
        return JSONValue.toJSONString(res);
    }
  • 可以看到这里从common获取配置,然后将自己的配置合并到component自身的配置中,也就是说addConfiguration的配置项会覆盖bolt在getComponentConfiguration方法中的配置

Executor.normalizedComponentConf

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    private Map<String, Object> normalizedComponentConf(
        Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
        List<String> keysToRemove = retrieveAllConfigKeys();
        keysToRemove.remove(Config.TOPOLOGY_DEBUG);
        keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
        keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
        keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
        keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
        keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);

        Map<String, Object> componentConf;
        String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
        if (specJsonConf != null) {
            try {
                componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }
            for (Object p : keysToRemove) {
                componentConf.remove(p);
            }
        } else {
            componentConf = new HashMap<>();
        }

        Map<String, Object> ret = new HashMap<>();
        ret.putAll(topoConf);
        ret.putAll(componentConf);

        return ret;
    }
  • Executor在构造器里头会调用normalizedComponentConf合并一下配置
  • 对于componentConf移除掉topology的部分配置项,然后对返回值,先putAll(topoConf)再putAll(componentConf),可以看到如果都有配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的话,componentConf的会覆盖掉topoConf的配置。

Executor.setupTicks

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    protected void setupTicks(boolean isSpout) {
        final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
        if (tickTimeSecs != null) {
            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
                || (!enableMessageTimeout && isSpout)) {
                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
            } else {
                StormTimer timerTask = workerData.getUserTimer();
                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
                                            () -> {
                                                TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
                                                                                Constants.SYSTEM_COMPONENT_ID,
                                                                                (int) Constants.SYSTEM_TASK_ID,
                                                                                Constants.SYSTEM_TICK_STREAM_ID);
                                                AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                                try {
                                                    receiveQueue.publish(tickTuple);
                                                    receiveQueue.flush(); // avoid buffering
                                                } catch (InterruptedException e) {
                                                    LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");
                                                    Thread.currentThread().interrupt();
                                                    return;
                                                }
                                            }
                );
            }
        }
    }
  • 这里的topoConf是topoConf与componentConf合并之后的配置,对满足条件的component设置timerTask
  • 可以看到这里new的TupleImpl的srcComponent设置为Constants.SYSTEM_COMPONENT_ID(__system),taskId设置为Constants.SYSTEM_TASK_ID(-1),streamId设置为Constants.SYSTEM_TICK_STREAM_ID(__tick)
  • timerTask在调度的时候调用JCQueue(receiveQueue).publish(tickTuple)

JCQueue.publish

    private final DirectInserter directInserter = new DirectInserter(this);

    /**
     * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
     */
    public void publish(Object obj) throws InterruptedException {
        Inserter inserter = getInserter();
        inserter.publish(obj);
    }

    private Inserter getInserter() {
        Inserter inserter;
        if (producerBatchSz > 1) {
            inserter = thdLocalBatcher.get();
            if (inserter == null) {
                BatchInserter b = new BatchInserter(this, producerBatchSz);
                inserter = b;
                thdLocalBatcher.set(b);
            }
        } else {
            inserter = directInserter;
        }
        return inserter;
    }

    private static class DirectInserter implements Inserter {
        private JCQueue q;

        public DirectInserter(JCQueue q) {
            this.q = q;
        }

        /**
         * Blocking call, that can be interrupted via Thread.interrupt
         */
        @Override
        public void publish(Object obj) throws InterruptedException {
            boolean inserted = q.tryPublishInternal(obj);
            int idleCount = 0;
            while (!inserted) {
                q.metrics.notifyInsertFailure();
                if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName());
                }

                idleCount = q.backPressureWaitStrategy.idle(idleCount);
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                inserted = q.tryPublishInternal(obj);
            }

        }
        //......
    }    

    // Non Blocking. returns true/false indicating success/failure. Fails if full.
    private boolean tryPublishInternal(Object obj) {
        if (recvQueue.offer(obj)) {
            metrics.notifyArrivals(1);
            return true;
        }
        return false;
    }
  • JCQueue.publish的时候调用inserter.publish,这里inserter可能是BatchInserter或DirectInserter,这里看一下DirectInserter的publish方法
  • DirectInserter的publish方法调用了JCQueue.tryPublishInternal,而该方法调用的是recvQueue.offer(obj),放入到recvQueue队列

JCQueue.consume

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java

    /**
     * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
     * elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
        try {
            return consumeImpl(consumer, exitCond);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     *
     * @param consumer
     * @param exitCond
     */
    private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
        int drainCount = 0;
        while (exitCond.keepRunning()) {
            Object tuple = recvQueue.poll();
            if (tuple == null) {
                break;
            }
            consumer.accept(tuple);
            ++drainCount;
        }

        int overflowDrainCount = 0;
        int limit = overflowQ.size();
        while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
            Object tuple = overflowQ.poll();
            ++overflowDrainCount;
            consumer.accept(tuple);
        }
        int total = drainCount + overflowDrainCount;
        if (total > 0) {
            consumer.flush();
        }
        return total;
    }
  • 聊聊storm worker的executor与task这篇文章我们有看到executor的asyncLoop主要是调用Executor.call().call()方法,对于BoltExecutor.call则是调用JCQueue.consume方法,该方法调用的是recvQueue.poll()
  • 可以看到tickTuple与bolt的业务tuple是共用一个队列的

小结

  • 关于tick的参数配置,有topology层面,有BoltDeclarer层面,也有bolt的getComponentConfiguration层面,三种方式,BoltDeclarer优先级最高,然后是bolt的getComponentConfiguration,最后是全局的topology层面的配置
  • 对于tickTuple,采用的是StormTimer进行调度,调度的时候,往bolt的JCQueue的publish方法,具体是是调用recvQueue.offer(obj);而executor的asycLoop调用Executor.call().call()方法,对于BoltExecutor.call则是调用JCQueue.consume方法,该方法调用的是recvQueue.poll()
  • 因此可以看到timer只负责往队列发送tickTuple,至于触发的时间精度,不一定百分百精确,具体要看recvQueue队列的长度以及executor的消费能力

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容

  • 黄昏擦洗着镜片 天空揉红了眼睛 三月的框架分崩离析 建筑师手持诗集愁眉苦脸 创作,是不可能的。 黑夜摩擦着手掌 大...
    子健阅读 182评论 2 1
  • 风摇树梢黑瓦老,舟漂云飘鸟鸣草,雨浇村外晓雾白,曲桥漾过又曲桥。
    吴牙阅读 705评论 9 19
  • 罗振宇说,付费和免费是完全不一样的逻辑,我们不是娱乐读者,更不是炫耀自己有多少知识,而是像读者提供“知识服务”。 ...
    琸珺洪kathy阅读 248评论 0 0
  • 昨天早上,助教老师发来信息,要到训练群练习每日功课1——爱的表达,每日功课2——正念篇,我赶忙去参加练习,看见孟妮...
    秋墨_8866阅读 306评论 3 1