第一部分:名词解释
1:什么是kafka
Kafka是一个消息队列的中间件,类似的有符合jms标准的MQ等。
2,kafka运用的场景有哪些
A)如果项目与项目之间需要交换数据/消息。则可以考虑使用kafka。B)集中日志收集处理。
3,kafka是否可以集群
Kafka可以集群,而且,kafka官方有给出集群方案,是使用zookeeper中间件作为集群解决方案。4,kafka的相关主要部件
Topic是主题,抽象类比队列Partition 分区。一个topic可以包含多个partition,这么设计的主要目的是因为kafka的消息存储与文件中,为了防止文件大小超过os的限制,这么设计,其次也可以调高消费的性能。
Produce 生产者
Consumer 消费者在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.Zookeeper
Kafka的集群解决方案的软件。
第二部分:kafka安装启动
从官方网站上下载kafka中间件,下载的时候需要的jkd版本。下载后文件如下图:
1,kafka集群配置
Zookeeper的集群配置:Zookeeper集群的工作是超过半数才能对外提供服务,所以采用2n+1的服务器,n为可以挂掉的服务器数量。
例如,在192.168.1.2 和 192.168.1.3的机器上配置集群,
1,复制kafka文件到两台机器上。
2,修改配置文件打开config/zookeeper.properties文件修改为一下
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
server.1=192.168.1.2:2888:3888
server.2=192.168.1.3:2888:3888
最主要修改 dataDir,clientPort,server
DataDir,存放log和数据的目录。
ClientPort,客户端连接的端口
Server.myid = a:b:cMyid 是datadir文件下myid的内容,内容为数字。如果没有此文件创建该文件,windows下用可以也在命令窗口使用echo 1>myid 创建。
3,启动zookeeper,找到bin/window目录,启动zookeeper的使用 zookeeper-server-start.bat ../../config/zookeeper.properties
Kafka配置1,修改../config/server.properties文件
主要修改#broker.id=0 每台服务器的broker.id都不能相同
#hostname
host.name=192.168.7.100
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181
2,启动kafka
命令为 kafka-server-start.bat ../../config/server.properties
第三部分:kafka的使用与java api
Java jar包:
生产者步骤
A)得到kafka生产者配置文件
B)根据配置文件生成生产者对象KafkaProducer
C)根据发送的内容生成消息对象ProducerRecord
D)KafkaProducer.SEND(ProducerRecord);KafkaProducer.flush();
2,消费者步骤
A) 生成消费者链接对象
B) 生成消费者监听流
C) 循环监听流得到数据处理
3,以上代码涉及的配置文件如下:
Produce
bootstrap.servers=localhost:9092
acks=all
retries=0
request.timeout.ms 10000
max.block.ms = 5000
key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
Consumer
zookeeper.connect=localhost:2181
group.id=a
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
enable.auto.commit=true
auto.commit.interval.ms=1000