Spring-kafka集成 - 配置Topic信息

配置Topic信息


一个前提:

在Application Context中需要配置有KafkaAdmin 管理Bean。

配置方式:

通过NewTopic类进行配置。

@Bean

public KafkaAdmin admin() {

    Map<String, Object> configs = new HashMap<>();

    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,

    StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));

    return new KafkaAdmin(configs);

}

@Bean

public NewTopic topic1() {

    return new NewTopic("foo", 10, (short) 2);

}

@Bean

public NewTopic topic2() {

    return new NewTopic("bar", 10, (short) 2);

}

注意点:

1.、 当broker不可用时,消息可以被记录,但是不影响应用上下文加载。后续可以通过admin的initialize方法重试,如果希望节点不可用时,不加载应用上下文,可以通过设置admin的fatalIfBrokerNotAvaliable 属性为true,这时上下文会初始失败;

2、 1.0.0以后版本,如果已经存在的topic的partitions分区小于NewTopic.numPartitions的值,此时admin会添加topic的分区数量;

3.、更多高级配置技巧,可以通过KafkaAdmin来直接进行操作;

如实例代码: 

@Autowired

private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfig());

...

    client.close();

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。