学习大纲
一、kafka介绍
Kafka最初是由Linkedln公司采用Scala语言开发的一个多分区、多副本并且基于ZooKeeper协调的分布式消息系统,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。
Apache Kafka是一个分布式的发布-订阅消息系统,能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。Kafka将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。
1、特性
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作;
(2)可扩展性:kafka集群支持热扩展,
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
(5)高并发:支持数千个客户端同时读写;
2、使用场景
(1)日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
(2)消息系统:解耦和生产者和消费者、缓存消息等;
(3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
(4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反
馈,比如报警和报告﹔
(5)流式处理:比如spark streaming和storm;
3、技术优势
- 可伸缩性:Kafka的两个重要特性造就了它的可伸缩性。
1、Kafka集群在运行期间可以轻松地扩展或收缩(可以添加或删除代理),而不会宕机。
2、可以扩展一个Kafka主题来包含更多的分区。由于一个分区无法扩展到多个代理,所以它的容量受到代理磁盘空间的限制。能够增加分区和代理的数量意味着单个主题可以存储的数据量是没有限制的。 - 容错性和可靠性:
Kafka的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。 - 吞吐量
代理能够以超快的速度有效地存储和检索数据。
二、概念详解
1、Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
2、Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
3、Topic
在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。
4、Partition
topic中的数据分割为一个或多个partition。每个topic至少有一partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
5、Partition offset
每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。
6、Replicas of partition
副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。副本之间是一主多从的关系。
7、Broker
Kafka集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
8、Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
9、Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从"in sync replicas”(ISR)列表中删除,重新创建一个Follower。
10、zookeeper
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
11、AR(Assigned Replicas)
分区中所有的副本统称为AR。
12、ISR(In-Sync Replicas)
所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR。
13、OSR(Out-of-Sync-Replicas)
与Leader副本同步滞后过多的副本。
14、HW(High Watermark)
高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。
15、LEO(Log End Offset)
即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0,9]。
三、安装与配置
1、使用Kafka需求有以下前置条件
- Linux系统/windows系统(博主选的是windows)
- java环境
- zookeeper
zookeeper下载地址
注意着两个文件都要下载,并把apache-zookeeper-3.5.8-bin.tar包里面的lib放到apache-zookeeper-3.5.8.tar.gz里面,不然在运行zookeeper的时候如果闪退。如果闪退可以在配置文件上写上pause,这样方便查看错误
错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
- Kafka环境安装
Kafka下载地址
将下载文件解压后运行如下命令,即可启动成功
.\bin\windows\kafka-server-start.bat .\config\server.properties
2、Kafka测试消息生产与消费
- 创建主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic haijia--partitions 2--replication-factor 1
--zookeeper:指定了Kafka所连接的Zookeeper服务地址
--create:创建主题的动作指令
--topic:指定了所要创建主题的名称
--partitions:指定了分区个数
--replication-factor:指定了副本因子
- 展示所有的主题
.bin/kafka-topics.sh --zookeeper localhost:2181 --list
- 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic haijia
- 删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic haijia
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
若delete.topic.enable=true,直接彻底删除该Topic。
若delete.topic.enable=false ,如果当前Topic没有使用过即没有传输过信息:可以彻底删除。如果当前Topic有使用过即有过传输过信息,并没有真正删除Topic只是把这个Topic标记为删除(marked for deletion),重启Kafka Server后删除。
- 增加分区
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic haijia --partitions 6
修改分区数时,仅能增加分区个数。若是用其减少 partition个数,则会报错误信息
- 启动消费段接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic haijia
--bootstrap-server 指定了连接Kafka集群的地址
-topic 指定了消费端订阅的主题
- 生产端发送消息
bin/kafka-console-pfoducer.sh --broker-list localhost:9092--topic haijia
--broker-list 指定了连接的Kafka集群的地址
--topic指定了发送消息时的主题
四、Java第一个程序
1、快速启动环境
D:\Software\apacheZookeeper3.5.8\apache-zookeeper-3.5.8\bin>zkServer.cmd
D:\Software\kafka_2.13-2.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
1、新建项目
也可以在pom.xml引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.haijia</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.0.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
<configuration>
<mainClass>com.haijia.kafka.KafkaApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、application.yml中引入kafka相关配置
spring:
kafka:
#如果是集群,用 ,分隔,如:112.126.74.249:9092,112.126.74.249:9093,172.101.203.33:9092
bootstrap-servers: 127.0.0.1:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 应答级别:
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
#properties:
# 自定义分区器
# partitioner-class: com.felix.kafka.producer.CustomizePartitioner
# 提交延时:当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka,
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# linger-ms: 0
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# spring.kafka.consumer.enable-auto-commit=true->是否自动提交offset
# spring.kafka.consumer.auto.commit.interval.ms=1000->提交offset延时(接收到消息后多久提交offset)
# spring.kafka.consumer.properties.session.timeout.ms=120000->消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
# spring.kafka.consumer.properties.request.timeout.ms=18000->消费请求超时时间
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
swagger:
enabled: true
3、producer
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class KafkaProducer {
//自定义topic
public static final String TOPIC_TEST = "topic.test";
public static final String TOPIC_GROUP1 = "topic.group1";
public static final String TOPIC_GROUP2 = "topic.group2";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("准备发送消息为:{}", obj2String);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
4、Consumer
package com.haijia.kafka.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(KafkaProducer.TOPIC_GROUP1+"KafkaConsumer 接收到消息");
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(KafkaProducer.TOPIC_GROUP2+"KafkaConsumer 接收到消息");
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
}
5、Controller
import com.haijia.kafka.kafka.KafkaProducer;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
@RequestMapping("/kafka")
@Controller
@Slf4j
@Api(value = "SwaggerValue", tags = "KafkaController", description = "测试接口相关")//,produces = Media
//Api【tags=“说明该类的作用,可以在UI界面上看到的注解”】
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping(value = "/send")
@ResponseBody
@Transactional(rollbackFor = Exception.class)
@ApiOperation(value = "kafka发送消息", notes = "发送消息")
//ApiOperation【value=“说明方法的用途、作用”;notes=“方法的备注说明】
@ApiImplicitParam(name = "id", value = "用户ID", required = false, dataType = "Long", paramType = "query")
//ApiImplicitParam【name:参数名;value:参数的汉字说明、解释;dataType: 参数类型,默认String;required : 参数是否必传,true必传
// defaultValue:参数的默认值;paramType:参数放在哪个地方;header请求参数的获取:@RequestHeader,参数从请求头获取
// query请求参数的获取:@RequestParam,参数从地址栏问号后面的参数获取;path(用于restful接口)请求参数的获取:@PathVariable,参数从URL地址上获取
// body(不常用)参数从请求体中获取;form(不常用)参数从form表单中获取】
//@ApiIgnore
//ApiIgnore: 使用该注解忽略这个API,不会生成接口文档。可注解在类和方法上
//@ApiResponse(code = 400,message = "参数错误",response = "抛出异常的类")
//ApiResponse:响应介绍
//@ApiResponses(一组响应)
//@ApiModel(用在返回对象类上,描述一个Model的信息(这种一般用在post创建的时候,使用@RequestBody这样的场景,请求参数无法使用@ApiImplicitParam注解进行描述的时候)
//@ApiModelProperty[value = 字段说明,name = 重写属性名字;dataType = 重写属性类型;required = 是否必填,true必填
// example = 举例说明;hidden = 隐藏]
public void sendMsg() {
kafkaProducer.send("this is a kafka top message!");
}
/* @RequestMapping(value = "/save", method = RequestMethod.POST)
//@ApiImplicitParam(name = "user", value = "用户实体user", required = true, dataType = "User")
@ApiOperation(value = "创建用户", notes = "创建用户")
public Map<String, Object> saveUser(@ApiParam(required = true, name = "user", value = "用户实体user") @RequestBody @Valid Object user) {return null;}*/
}
6、Swagger
package com.haijia.kafka.common;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import springfox.documentation.spi.DocumentationType;
@Configuration
//注解开启 swagger2 功能,启动后访问 http://localhost:8080/swagger-ui.html
@EnableSwagger2
public class Swagger2Config {
//是否开启swagger,正式环境一般是需要关闭的
@Value("${swagger.enabled}")
private boolean enableSwagger;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
//是否开启 (true 开启 false隐藏。生产环境建议隐藏)
.enable(enableSwagger)
.select()
//扫描的路径包,设置basePackage会将包下的所有被@Api标记类的所有方法作为api
.apis(RequestHandlerSelectors.basePackage("com.haijia.kafka.controller"))
//指定路径处理PathSelectors.any()代表所有的路径
.paths(PathSelectors.any())
.build();
}
/**
*
* - swagger.title=标题
* - swagger.description=描述
* - swagger.version=版本
* - swagger.license=许可证
* - swagger.licenseUrl=许可证URL
* - swagger.termsOfServiceUrl=服务条款URL
* - swagger.contact.name=维护人
* - swagger.contact.url=维护人URL
* - swagger.contact.email=维护人email
* - swagger.base-package=swagger扫描的基础包,默认:全扫描
* - swagger.base-path=需要处理的基础URL规则,默认:/**
* - swagger.exclude-path=需要排除的URL规则,默认:空
* - swagger.host=文档的host信息,默认:空
*/
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
//设置文档标题(API名称)
.title("SpringBoot中使用Swagger2构建RESTful接口")
//文档描述
.description("接口说明")
//服务条款URL
.termsOfServiceUrl("http://127.0.0.1:8080/")
//联系信息
.contact(new Contact("黄海佳","www.baidu.com","9218*413@qq.com"))
//版本号
.version("1.0.0")
.build();
}
}