配置Topic信息
一个前提:
在Application Context中需要配置有KafkaAdmin 管理Bean。
配置方式:
通过NewTopic类进行配置。
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("foo", 10, (short) 2);
}
@Bean
public NewTopic topic2() {
return new NewTopic("bar", 10, (short) 2);
}
注意点:
1.、 当broker不可用时,消息可以被记录,但是不影响应用上下文加载。后续可以通过admin的initialize方法重试,如果希望节点不可用时,不加载应用上下文,可以通过设置admin的fatalIfBrokerNotAvaliable 属性为true,这时上下文会初始失败;
2、 1.0.0以后版本,如果已经存在的topic的partitions分区小于NewTopic.numPartitions的值,此时admin会添加topic的分区数量;
3.、更多高级配置技巧,可以通过KafkaAdmin来直接进行操作;
如实例代码:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfig());
...
client.close();