Springboot搭建kafka生产消费Demo

网上有很多实例,但没有一个能完整跑的通的,本人亲自搭建了一个

先贴出配置配置文件

image.png
spring:
  kafka:
    topic.default: test_kafka_wth
    packages: com.wonders.cz.sms
    producer:
      servers: 127.0.0.1:9092
      retries: 0 # retries = MAX 无限重试,直到你意识到出现了问题:
      batch.size: 16384 # producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
      linger: 1 # 延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,
      buffer.memory: 33554432 # producer可以用来缓存数据的内存大小。
    consumer:
      zookeeper.connect: 127.0.0.1:2181
      servers: 127.0.0.1:9092
      enable.auto.commit: true
      session.timeout: 30000
      auto.commit.interval: 1000
      auto.offset.reset: latest
      group.id: test
      concurrency: 10

SmsApplication.java

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@ComponentScan(basePackages = {"com.cz.**"})
public class SmsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SmsApplication.class, args);
    }

}

MessageEntity.java

/**
 * 消息通知表
 */
@TableName("cz_message")
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessageEntity extends RequestListEntity implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 消息通知id
     */
    @TableId(type = IdType.INPUT)
    @ApiModelProperty("消息通知id,主键")
    @NotBlank(groups = {VG.Get.class, VG.Delete.class, VG.Update.class})
    private String id;
    /**
     * 消息发送者组织机构id
     */
    @ApiModelProperty("消息发送者组织机构id")
    @NotBlank(groups = VG.Add.class)
    private String sendOrganizationId;
    /**
     * 消息发送者id
     */
    @ApiModelProperty("消息发送者id")
    @NotBlank(groups = VG.Add.class)
    private String sendUserId;
    /**
     * 消息摘要
     */
    @ApiModelProperty("消息摘要")
    @NotBlank(groups = VG.Add.class)
    private String messageDigest;
    /**
     * 消息内容
     */
    @ApiModelProperty("消息内容")
    @NotBlank(groups = VG.Add.class)
    private String messageContent;
    /**
     * 消息状态 0-发送中,1-已读 2-未读
     */
    @ApiModelProperty("消息状态")
    @NotNull(groups = VG.Add.class)
    private Integer status;
    /**
     * 消息接收者组织机构id
     */
    @ApiModelProperty("消息接收者组织机构id")
    @NotBlank(groups = VG.Add.class)
    private String receiveOrganizationId;
    /**
     * 创建时间
     */
    @ApiModelProperty("创建时间")
    private Date createTime;
    /**
     * 修改时间
     */
    @ApiModelProperty("修改时间")
    private Date modifyTime;
    /**
     * 消息发送者组织机构名
     */
    @TableField(exist = false)
    private String organizationName;

    /**
     * 消息接收者组织机构名
     */
    @TableField(exist = false)
    private String receiveOrganizationName;
    /**
     * 消息发送者名
     */
    @TableField(exist = false)
    private String SendUserName;

    public void setId(String id) {
        this.id = id;
    }

    public String getId() {
        return id;
    }

    public void setSendOrganizationId(String sendOrganizationId) {
        this.sendOrganizationId = sendOrganizationId;
    }

    public String getSendOrganizationId() {
        return sendOrganizationId;
    }

    public void setSendUserId(String sendUserId) {
        this.sendUserId = sendUserId;
    }

    public String getSendUserId() {
        return sendUserId;
    }

    public void setMessageDigest(String messageDigest) {
        this.messageDigest = messageDigest;
    }

    public String getMessageDigest() {
        return messageDigest;
    }

    public void setMessageContent(String messageContent) {
        this.messageContent = messageContent;
    }

    public String getMessageContent() {
        return messageContent;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public Integer getStatus() {
        return status;
    }

    public void setReceiveOrganizationId(String receiveOrganizationId) {
        this.receiveOrganizationId = receiveOrganizationId;
    }

    public String getReceiveOrganizationId() {
        return receiveOrganizationId;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setModifyTime(Date modifyTime) {
        this.modifyTime = modifyTime;
    }

    public Date getModifyTime() {
        return modifyTime;
    }

    public String getSendUserName() {
        return SendUserName;
    }

    public void setSendUserName(String sendUserName) {
        SendUserName = sendUserName;
    }

    public String getOrganizationName() {
        return organizationName;
    }

    public void setOrganizationName(String organizationName) {
        this.organizationName = organizationName;
    }

    public String getReceiveOrganizationName() {
        return receiveOrganizationName;
    }

    public void setReceiveOrganizationName(String receiveOrganizationName) {
        this.receiveOrganizationName = receiveOrganizationName;
    }
}

ErrorCode.java

public class ErrorCode {
    public final static int SUCCESS = 200;
    public final static int EXCEPTION = 500;
}

Response.java

@Getter
@Setter
public class Response {
    private int code;
    private String message;

    public Response(int code, String message) {
        this.code = code;
        this.message = message;
    }
}

KafkaConsumerConfig.java


@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.servers}")
    private String servers;
    @Value("${spring.kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${spring.kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.group.id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;


    @Bean
    //个性化定义消费者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    /**
     * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
     * @return
     */
    @Bean
    public DefaultKafkaConsumerFactory consumerFactory(){
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

KafkaProducerConfig.java


@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.servers}")
    private String servers;
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    @Value("${spring.kafka.producer.batch.size}")
    private int batchSize;
    @Value("${spring.kafka.producer.linger}")
    private int linger;
    @Value("${spring.kafka.producer.buffer.memory}")
    private int bufferMemory;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    public ProducerFactory<String, MessageEntity> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<>());
    }

    @Bean
    public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

SimpleConsumer.java

@Slf4j
@Component
public class SimpleConsumer {
    protected final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final Gson gson = new Gson();
    @KafkaListener(topics = "${spring.kafka.topic.default}")
    public void receive(ConsumerRecord<?, ?> record) {
        logger.info("----------------------------------kafka的offset: " + record.offset());
        logger.info("----------------------------------kafka的value: " + record.value().toString());
        // JSONArray object = JSON.parseArray(record.value().toString());

    }
}

ProduceController.java


@Slf4j
@RestController
@RequestMapping("/kafka")
public class ProduceController {
    @Autowired
    private SimpleProducer simpleProducer;

    @Value("${spring.kafka.topic.default}")
    private String topic;

    private Gson gson = new Gson();

    @RequestMapping(value = "/hello", method = RequestMethod.GET, produces = {"application/json"})
    public Response sendKafka() {
        return new Response(ErrorCode.SUCCESS, "OK");
    }


    @RequestMapping(value = "/send", method = RequestMethod.POST, produces = {"application/json"})
    public Response sendKafka(@RequestBody MessageEntity message) {
        try {
            simpleProducer.send(topic, "key", message);
            log.info("发送kafka成功.");
            return new Response(ErrorCode.SUCCESS, "发送kafka成功");
        } catch (Exception e) {
            log.error("发送kafka失败", e);
            return new Response(ErrorCode.EXCEPTION, "发送kafka失败");
        }
    }

}

ProducerCallback.java


@Slf4j
public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> {

    private final long startTime;
    private final String key;
    private final MessageEntity message;

    private final Gson gson = new Gson();

    public ProducerCallback(long startTime, String key, MessageEntity message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }


    @Override
    public void onSuccess(@Nullable SendResult<String, MessageEntity> result) {
        if (result == null) {
            return;
        }
        long elapsedTime = System.currentTimeMillis() - startTime;

        RecordMetadata metadata = result.getRecordMetadata();
        if (metadata != null) {
            StringBuilder record = new StringBuilder();
            record.append("message(")
                    .append("key = ").append(key).append(",")
                    .append("message = ").append(gson.toJson(message)).append(")")
                    .append("sent to partition(").append(metadata.partition()).append(")")
                    .append("with offset(").append(metadata.offset()).append(")")
                    .append("in ").append(elapsedTime).append(" ms");
            log.info(record.toString());
        }
    }

    @Override
    public void onFailure(Throwable ex) {
        ex.printStackTrace();
    }
}

SimpleProducer.java


@Component
public class SimpleProducer {

    @Autowired
    @Qualifier("kafkaTemplate")
    private KafkaTemplate<String, MessageEntity> kafkaTemplate;

    public void send(String topic, MessageEntity message) {
        kafkaTemplate.send(topic, message);
    }

    public void send(String topic, String key, MessageEntity entity) {
        ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
                topic,
                key,
                entity);

        long startTime = System.currentTimeMillis();

        ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
        future.addCallback(new ProducerCallback(startTime, key, entity));
    }

}

使用postman可以这样子测

image.png

控制台打印就是这样

2019-12-20 11:18:34.327  INFO 29780 --- [nio-8055-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2019-12-20 11:18:34.327  INFO 29780 --- [nio-8055-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
2019-12-20 11:18:34.372  INFO 29780 --- [nio-8055-exec-2] c.w.cz.sms.controller.ProduceController  : 发送kafka成功.
2019-12-20 11:18:34.402  INFO 29780 --- [ntainer#0-0-C-1] c.w.cz.sms.consumer.SimpleConsumer       : ----------------------------------kafka的offset: 30
2019-12-20 11:18:34.402  INFO 29780 --- [ntainer#0-0-C-1] c.w.cz.sms.consumer.SimpleConsumer       : ----------------------------------kafka的value: {"messageDigest":"a卡夫a卡数据测试4水水水水6666","messageContent":"数据体66哇哇哇哇6666"}
2019-12-20 11:18:34.406  INFO 29780 --- [ad | producer-1] c.w.cz.sms.producer.ProducerCallback     : message(key = key,message = {"messageDigest":"a卡夫a卡数据测试4水水水水6666","messageContent":"数据体66哇哇哇哇6666"})sent to partition(0)with offset(30)in 87 ms

技术交流或有疑问请留言,觉得好了点个赞!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容

  • 一. Java基础部分.................................................
    wy_sure阅读 3,807评论 0 11
  • JAVA面试题 1、作用域public,private,protected,以及不写时的区别答:区别如下:作用域 ...
    JA尐白阅读 1,148评论 1 0
  • 前言 在Android开发中,消息推送功能的使用非常常见。 推送消息截图 为了降低开发成本,使用第三方推送是现今较...
    BillyLu1994阅读 4,383评论 0 2
  • 序 又到了写年终总结的时候了。每当这个时候思绪总是翻江倒海,因为太久没有反思和总结的缘故,一年才总结一次,确实是有...
    go4it阅读 520评论 1 6
  • 守护生态活动之 “碧水丹山——朱子理学”中国画名家采风走进生态武夷山 2019年10月3日——8日,中国美...
    笑竹颜开阅读 1,131评论 0 1