KafkaSpout
1.poutConfig继承KafkaConfig,可以通过SpoutConfig设置kafkaSpout基本属性
spoutConfig.forceFromStart可以设置不从kafka初始位置消费,以免重复消费数据。
2.Config.TOPOLOGY_MAX_SPOUT_PENDING配置可以动态对kafka消费进行限流。
EsBolt
1)向Es发数据时发生了NullPointerException:
at org.codehaus.jackson.util.TextBuffer.findBuffer(TextBuffer.java:207)
refer
2)用户自定义esIndex
builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("bolt1");
builder.setBolt("testBolt", new EsBolt("{esIndex}/" + "test", conf), 2).shuffleGrouping("bolt2");
在Bolt1中定义esIndex
public class TimeBasedIndexNameBuilder {
public static String build(String indexPrefix, Date collectTime) {
return indexPrefix + "_" + new SimpleDateFormat("yyyy-MM-dd").format(collectTime);
}
}
String esIndex = TimeBasedIndexNameBuilder.build("agentX", new Date());
如上,EsBolt就会传入agentX_2016-10-26/test,在ES服务器上生成index:agentX_2016-10-26,type:test
bolt继承多spout
现有bolt需要接受来自spout1和spout2的数据流,可通过getSourceComponent来判断数据流来自哪个spout,然后做进一步处理。
//spou1
builder.setSpout("spout1", new Spout1(spoutConfig), 2);
//spou2
builder.setSpout("spout2", new Spout2(), 2);
builder.setBolt("bolt1", new Bolt1(), 2).allGrouping("spout1").shuffleGrouping("spout2");
Bolt1部分代码如下:
@Override
public void execute(Tuple input) {
//判断数据流来自Spout1
if(input.getSourceComponent().equals("spout1")) {
...
} else {
...
}
}