Kafka+Springboot一遍包会

linux搭建,kafkao3节点虚拟机为CentOS6,ip为192.168.1.128,192.168.1.129和192.168.1.130,域名分别为master,worker1,worker2

1. 集群

#192.168.1.128[root@masterlocal]#cd/home/gilbert/app/rar/[root@masterrar]#tarzxvfkafka_2.10-0.10.2.0.tgz[root@masterrar]#mvkafka_2.10-0.10.2.0/home/gilbert/app/kafka​

配置文件路径:kafka/config/server.properties

修改配置文件



启动kafka   

[root@masterkafka]#./bin/kafka-server-start.shconfig/server.properties&[2018-06-2502:31:21,931]INFOKafkaConfigvalues:advertised.host.name=nulladvertised.listeners=nulladvertised.port=nullauthorizer.class.name=auto.create.topics.enable=trueauto.leader.rebalance.enable=truebackground.threads=10broker.id=0broker.id.generation.enable=truebroker.rack=nullcompression.type=producerconnections.max.idle.ms=600000controlled.shutdown.enable=truecontrolled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms=5000controller.socket.timeout.ms=30000

创建topic

#创建topictopic名字为gilbert[root@masterkafka]#./bin/kafka-topics.sh--create--zookeepermaster:2181,worker1:2181,worker2:2181--replication-factor3--partitions3--topicgilbertCreatedtopic"gilbert".

查看topic

[root@masterkafka]#./bin/kafka-topics.sh--describe--zookeepermaster:2181,worker1:2181,worker2:2181--topicgilbertTopic:gilbertPartitionCount:3ReplicationFactor:3Configs:Topic:gilbertPartition:0Leader:2Replicas:2,0,1Isr:2,0,1Topic:gilbertPartition:1Leader:0Replicas:0,1,2Isr:0,1,2Topic:gilbertPartition:2Leader:1Replicas:1,2,0Isr:1,2,0          [root@masterkafka]#./bin/kafka-topics.sh--list--zookeepermaster:2181,worker1:2181,worker2:2181gilberttest

创建producer

./bin/kafka-console-producer.sh--broker-listmaster:9092-topicgilbert

创建consumer,分别在3台服务器上执行创建消费者

#192.168.1.128服务器[root@masterkafka]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper].  #192.168.1.129服务器[root@worker1kafka_2.10-0.10.2.0]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper].​#192.168.1.130服务器[root@worker2kafka_2.10-0.10.2.0]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper].​​

在#192.168.1.128服务器上生产者控制台输入:hello kafka进行测试

在3台服务器上的消费者都正常接收到消息

删除topic

[root@masterkafka]#./bin/kafka-topics.sh--delete--zookeepermaster:2181,worker1:2181,worker2:2181--topictestTopictestismarkedfordeletion.Note:Thiswillhavenoimpactifdelete.topic.enableisnotsettotrue

springboot集成kafka

1.生产者kafka-producer

a) pom文件

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>

b) yml配置文件,本例为kafka3节点集群

spring:  kafka:    bootstrap-servers: http://master:9092,http://worker1:9092,http://worker2:9092    producer:      retries: 0      batch-size: 16384      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer

c) message消息实体类

@DatapublicclassMessage{privateLongid;//idprivateStringmsg;//消息privateDatesendTime;//时间戳}

d) 生产者

@Component@Slf4jpublicclassKafkaProducer{​@AutowiredprivateKafkaTemplate<String,String>kafkaTemplate;​privateGsongson=newGsonBuilder().create();​//发送消息方法publicvoidsend() {Messagemessage=newMessage();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(newDate());log.info("+++++++++++++++++++++  message = {}",gson.toJson(message));  //topic-ideal为主题kafkaTemplate.send("topic-ideal",gson.toJson(message));    }}

e) 测试类,运行kafkaProducer方法即可

@RunWith(SpringRunner.class)@SpringBootTestpublicclassKafkaProducerApplicationTests{​    @Autowired    privateKafkaProducerkafkaProducer;​    @Test    publicvoidkafkaProducer(){        this.kafkaProducer.send();    }​    @Test    publicvoidcontextLoads() {    }​}

2. 消费者kafka-consumer

a) pom文件

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

b) yml配置文件

server:  port: 9999spring:  kafka:    bootstrap-servers: http://master:9092,http://worker1:9092,http://worker2:9092    consumer:      group-id: ideal-consumer-group      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 20000      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

c) 消费者类

@Component@Slf4jpublicclassKafkaConsumer{​@KafkaListener(topics={"topic-ideal"})publicvoidconsumer(ConsumerRecord<?,?>record){Optional<?>kafkaMessage=Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {Objectmessage=kafkaMessage.get();log.info("----------------- record ="+record);log.info("------------------ message ="+message);        }    }}

运行消费者kafka-consumer,再运行kafka-producer工程测试类KafkaProducerApplicationTests中kafkaProducer()方法,可以看到消费者后台正常接收消息


引自:https://blog.csdn.net/jucks2611/article/details/80817476

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。