Java实现Kafka基本的生产者和消费者Demo

第一、环境准备

        1、JDK

        2、Zookeeper

        3、Kafka

第二、实现效果

Postman或Chrome中,调用如下接口

生产者Producer  Console显示

miniooc send message:

{

  "uuid": "d3660a0d-ca78-44fa-9d78-b41f5101bc25",

  "date": "2019-07-10 09:09:16"

}

消费者Consumer  Console显示

miniooc receive message:

{

  "uuid": "d3660a0d-ca78-44fa-9d78-b41f5101bc25",

  "date": "2019-07-10 09:09:16"

}

第三、启动Zookeeper和Kafka

启动Zookeeper

启动Kafka

第四、生产者源码

1、目录结构

2、POM

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

        <groupId>com.google.code.gson</groupId>

        <artifactId>gson</artifactId>

        <version>2.8.5</version>

       <groupId>org.projectlombok</groupId>

        <artifactId>lombok</artifactId>

        <version>1.16.22</version>

        <scope>provided</scope>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-test</artifactId>

        <scope>test</scope>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka-test</artifactId>

        <scope>test</scope>

</dependencies>

3、配置文件application.properties

        server.port=9526

        spring.application.name=kafka-producer

        kafka.bootstrap.servers=127.0.0.1:9092

        kafka.topic.order=topic-order

        kafka.group.id=group-order

4、实体类MessageBean

package com.example.producer.entity;

import lombok.Data;

import java.io.Serializable;

import java.util.Date;

/**

* 消息实体类

*/

@Data

public class MessageBeanimplements Serializable {

    /** uuid */

    private Stringuuid;

    /** 时间  */

    private Datedate;

    public MessageBean() {

}

public MessageBean(String uuid, Date date) {

this.uuid = uuid;

        this.date = date;

    }

public StringgetUuid() {

return uuid;

    }

public void setUuid(String uuid) {

this.uuid = uuid;

    }

public DategetDate() {

return date;

    }

public void setDate(Date date) {

this.date = date;

    }

@Override

    public StringtoString() {

return "MessageBean{" +

"uuid='" +uuid +'\'' +

", date=" +date +

'}';

    }

}

5、Kafka生产者配置类和生产者类

package com.example.producer.produce;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

/**

* kafka配置类

*/

@EnableKafka

@Configuration

public class KafkaProducerConfig {

@Value("${kafka.bootstrap.servers}")

private StringBOOTSTRAP_SERVERS_CONFIG;

    @Bean

    public KafkaTemplatekafkaTemplate(){

return new KafkaTemplate<>(producerFactory());

    }

public ProducerFactoryproducerFactory(){

Map props =new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

        props.put(ProducerConfig.RETRIES_CONFIG, 0);

        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);

        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);

    }

}


package com.example.producer.produce;

import com.example.producer.entity.MessageBean;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import lombok.extern.java.Log;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

* Kafka消息生产类

*/

@Log

@Component

public class KafkaProducer {

Loggerlog = LoggerFactory.getLogger(KafkaProducer.class);

    @Resource

    private KafkaTemplatekafkaTemplate;

    @Value("${kafka.topic.order}")

     private StringtopicOrder;

    /**

* 发送消息

*

    * @param messageBean 消息实例

*/

    public void sendMessage(MessageBean messageBean){

        GsonBuilder builder =new GsonBuilder();

        builder.setPrettyPrinting();

        builder.setDateFormat("yyyy-MM-dd HH:mm:ss");

        Gson gson = builder.create();

        // 将消息实例序列化为json格式的字符串

        String message = gson.toJson(messageBean);

        // 发送消息

        kafkaTemplate.send(topicOrder,message);

        // 打印消息

        log.info("\nminiooc send message:\n" + message);

    }

}

6、Kafka Controller调用类

package com.example.producer.controller;

import com.example.producer.entity.MessageBean;

import com.example.producer.produce.KafkaProducer;

import lombok.extern.java.Log;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/**

* 消息控制器

*/

@Log

@Controller

@RequestMapping("/message")

public class MessageController {

@Resource

    private KafkaProducerkafkaProducer;

    /**

* 生成消息

*

    * @return

    */

    @RequestMapping("/create")

@ResponseBody

    public Mapcreate(){

// 创建消息

        MessageBean messageBean =new MessageBean();

        String uuid = UUID.randomUUID().toString();

        messageBean.setUuid(uuid);

        messageBean.setDate(new Date());

        // 将消息发送到 kafka

        kafkaProducer.sendMessage(messageBean);

        Map model =new HashMap();

        // 返回成功信息

        model.put("resultCode",1);

        model.put("resultMsg","success");

        model.put("messageBean",messageBean);

        return model;

    }

}


第五、消费者源码

1、目录结构

2、POM

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

        <groupId>com.google.code.gson</groupId>

        <artifactId>gson</artifactId>

        <version>2.8.5</version>

       <groupId>org.projectlombok</groupId>

        <artifactId>lombok</artifactId>

        <version>1.16.22</version>

        <scope>provided</scope>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-test</artifactId>

        <scope>test</scope>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka-test</artifactId>

        <scope>test</scope>

</dependencies>

3、配置文件application.properties

        server.port=9527

        spring.application.name=kafka-consumer

        kafka.bootstrap.servers=127.0.0.1:9092

        kafka.topic.order=topic-order

        kafka.group.id=group-order

4、实体类MessageBean

package com.example.consumer.controller.entity;

import lombok.Data;

import java.io.Serializable;

import java.util.Date;

/**

* 消息实体类

*/

@Data

public class MessageBeanimplements Serializable {

    /** uuid */

    private Stringuuid;

    /** 时间  */

    private Datedate;

    public MessageBean() {

}

public MessageBean(String uuid, Date date) {

this.uuid = uuid;

        this.date = date;

    }

public StringgetUuid() {

return uuid;

    }

public void setUuid(String uuid) {

this.uuid = uuid;

    }

public DategetDate() {

return date;

    }

public void setDate(Date date) {

this.date = date;

    }

@Override

    public StringtoString() {

return "MessageBean{" +

"uuid='" +uuid +'\'' +

", date=" +date +

'}';

    }

}

5、Kafka消费者配置和消费监听类

package com.example.consumer.controller.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;

import java.util.Map;

/**

* kafka配置类

*/

@EnableKafka

@Configuration

public class KafkaConsumerConfig {

@Value("${kafka.bootstrap.servers}")

private StringBOOTSTRAP_SERVERS_CONFIG;

    @Value("${kafka.group.id}")

private StringGROUP_ID_CONFIG;

    @Bean

    public KafkaListenerContainerFactory>kafkaListenerContainerFactory(){

ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(cnsumerFactory());

        factory.setConcurrency(10);

        factory.getContainerProperties().setPollTimeout(3000);

        return factory;

    }

public ConsumerFactorycnsumerFactory(){

Map propsMap =new HashMap<>();

        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交

        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);

        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new DefaultKafkaConsumerFactory<>(propsMap);

    }

}


package com.example.consumer.controller.consumer;

import com.example.consumer.controller.entity.MessageBean;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.google.gson.reflect.TypeToken;

import lombok.extern.java.Log;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

/**

* Kafka消息消费类

*/

@Log

@Component

public class KafkaConsumer {

Loggerlog = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics ="${kafka.topic.order}",containerFactory ="kafkaListenerContainerFactory")

public void consume(@Payload String message){

GsonBuilder builder =new GsonBuilder();

        builder.setPrettyPrinting();

        builder.setDateFormat("yyyy-MM-dd HH:mm:ss");

        Gson gson = builder.create();

        // 将接收到的消息反序列化消息实例

        MessageBean messageBean = gson.fromJson(message,new MessageBean().getClass());

        // 将消息实例序列化为json格式的字符串

        String json = gson.toJson(messageBean);

        // 打印消息

        log.info("\nminiooc receive message:\n" + json);

    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335