topology中往akfka队列发和在bolt间流转是不一样的(2019-01-16)

在bolt间流转

```java

builder.setSpout(Order_Spout + "2", new KafkaSpout<String, String>(ksAfterConf),

      SytConfig.getInt("topology-order.spout.parallelism-hint", 6));

builder.setBolt(Prepare_Bolt, new PrepareBolt(),

      SytConfig.getInt("topology-order.bolt.prepare.parallelism-hint", 9))

      .shuffleGrouping(Order_Spout).shuffleGrouping(Order_Spout + "2");

builder.setBolt("aliUserInfoBolt", new AliUserInfoBolt(),SytConfig.getInt("topology-order.bolt.prepare.parallelism-hint", 9)).shuffleGrouping(Prepare_Bolt);

```

Topology中往队列上发

builder.setBolt("aliUserInfoBolt", new AliUserInfoBolt(),

      SytConfig.getInt("topology-order.bolt.prepare.parallelism-hint", 9)).shuffleGrouping(Prepare_Bolt, "ali-order-withdraw");

KafkaBolt<String, String> outToWebsocketBolt = new KafkaBolt<String, String>()

      .withProducerProperties(properties)

      .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>("type", "code"))    //此处type和code与上个bolt中的emit对应

      .withTopicSelector(

            new DefaultTopicSelector(SytConfig.getString("topology-order.kafka.websocket.topic", "topic-thirdpart-order-ali-auth")));

//往队列上发的数据已在上一个bolt--aliUserInfoBolt中处理好

builder.setBolt("thirdpart1", outToWebsocketBolt,

      SytConfig.getInt("topology-order.bolt.finance-order-topo.parallelism-hint", 6))

      .shuffleGrouping("aliUserInfoBolt", "ali-websocket-auth");

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。