简介
消息中间件
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
broker
topic
partition
producer
consumer
consumer group
安装
- 下载解压
- 启动zookeeper
- 修改config/server.proprties的:
log.dirs=E:\data\kafka-logs
zookeeper.connect=10.129.83.213:2181
listeners=PLAINTEXT://10.143.47.32:9092
- 启动命令
kafka_2.12-0.10.2.1 需要jdk1.8
kafka_2.11-0.10.0.1 需要jdk1.7
Kafka Shell基本命令(包括topic的增删改查)
#win
.\bin\windows\kafka-server-start.bat .\config\server.properties
#linux
bin/kafka-server-start.sh config/server.properties
#创建一个名为“test”的Topic,只有一个分区和一个备份
bin/kafka-topics.sh --create --zookeeper 10.129.83.213:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper 10.129.83.213:2181
#查看consumer-groups
bin/kafka-consumer-groups.sh --list --bootstrap-server 10.143.47.32:9092
bin/kafka-consumer-groups.sh --bootstrap-server 10.143.47.32:9092 --describe --group myGroup
#查看消费了多少数据
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group myGroup --topic test --zookeeper 10.129.83.213:2181
#查看test详细信息
bin/kafka-topics.sh --describe --topic test --zookeeper 10.129.83.213:2181
#发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#发送消息
bin/kafka-console-producer.sh --broker-list 10.143.47.32:9092 --topic test
#消息内容
This is a message
This is another message
Hello World
#消费消息
#消费者线程数必须是小等于topic的partition分区数
bin/kafka-console-consumer.sh --zookeeper 10.129.83.213:2181 --topic test --from-beginning
#消费消息
bin/kafka-console-consumer.sh --bootstrap-server 10.143.47.32:9092 --topic test --from-beginning
每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
patition 和 key 都未指定,使用轮询选出一个 patition。
- kafka_2.10-0.10.2.0 需要jdk1.7(springboot 1.5.3集成)
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.3.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>1.5.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>1.5.3.RELEASE</version>
</dependency>
application.properties
spring.application.name=springboot-kafka-test
#kafka
spring.kafka.bootstrap-servers=10.143.47.32:9092
spring.kafka.consumer.group-id=myGroup
#charset
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.template.default-topic=test
spring.kafka.listener.concurrency=1
spring.kafka.producer.batch-size=1000
#log4j2
logging.config=classpath:log4j2.xml
配置
@Configuration
@EnableKafka
public class KafkaConfig {
}
生产者
@Component
public class MsgProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String value){
System.out.println("send start-----------");
kafkaTemplate.send("test", value+"1");
kafkaTemplate.send("test", value+"2");
System.out.println("send end-----------");
}
}
消费者
@Component
public class MsgConsumer {
static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger");
@KafkaListener(topics="test")
public void processMsg1(String s){
subscribelogger.info("{}|{}","myGroup",s);
}
/*@KafkaListener(topics="test")
public void processMsg(ConsumerRecord<?, ?> record){
subscribelogger.info("{}|{}","myGroup1",record.value());
}*/
}