服务器配置说明
生产环境中的KAFKA多以集群模式部署,以实现高可用和横向扩展。下面介绍部署配置的硬件资源信息:
- 3台LXC的机器,
- 内存大小至少为8G,
- 操作系统为debian 8/9,
- 安装好JDK 8。
下面列举的是服务器的IP地址:
服务器编号 | 内网地址 | 外网地址 |
---|---|---|
1 | 10.187.3.37 | 10.187.15.37 |
2 | 10.187.3.38 | 10.187.15.38 |
3 | 10.187.3.39 | 10.187.15.39 |
KAFKA搭建
kafka安装包下载(注意:这里选择的安装包是2.11系列的1.1.0版本的)
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
备注:
- 如果网络访问较慢,或者无法联通公网,建议通过其他网络下载并且上传到需要搭建应用的服务器上。
安装启动服务
首先,我们需要下载并且安装zk和kafka,并且将这两个服务启动:
# 解压缩文件
tar zxvf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0/
# 修改机器的config/zookeeper.properties,增加如下内容:
tickTime=2000
initLimit=10
syncLimit=5
# 下面地址为各个服务器的机房网地址
server.1=10.187.3.37:2889:3889
server.2=10.187.3.38:2889:3889
server.3=10.187.3.39:2889:3889
# 新建文件myid
mkdir /tmp/zookeeper/
touch /tmp/zookeeper/myid
echo 1 >> /tmp/zookeeper/myid
# 下面以1号机器为例进行说明
# 修改1号机器的config/server.properties,修改如下内容:
broker.id=1
# 修改服务监听地址
listeners=PLAINTEXT://0.0.0.0:9092
# 添加1机器的外网IP
advertised.listeners=PLAINTEXT://10.180.156.49:9092
# 分区数修改为固定值2
num.partitions=2
# 添加zk地址,ip为内网地址,用于加速访问
zookeeper.connect=10.187.3.37:2181,10.187.3.38:2181,10.187.3.39:2181
# 偏移量全部修改为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.dirs=/home/cloud/kafka-logs
# 修改zookeeper的JVM启动参数:将-Xmx和-Xms统一修改为4G
vim bin/zookeeper-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
# 修改kafka的JVM启动参数:将-Xmx和-Xms统一修改为4G
vim bin/zookeeper-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
# 启动zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 检查zk是否启动成功
netstat -tunpl|grep 2181
tcp6 0 0 :::2181 :::* LISTEN 2877/java
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 检查kafka是否启动成功
netstat -tunpl|grep 9092
tcp6 0 0 :::9092 :::* LISTEN 3164/java
验证
验证1:服务日志观察
观察zk和kafka的日志是否有异常:
cloud@pubt1-lxc7-kafka2:~/kafka_2.11-1.1.0$ tailf logs/zookeeper.out
[2019-02-27 13:21:41,389] INFO Server environment:os.version=3.16.0-4-amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.name=cloud (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.home=/home/cloud (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.dir=/home/cloud/kafka_2.11-1.1.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,391] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,392] INFO FOLLOWING - LEADER ELECTION TOOK - 28 (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:41,394] INFO Resolved hostname: 10.187.3.39 to address: /10.187.3.39 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2019-02-27 13:21:41,403] INFO Getting a diff from the leader 0x10000004e (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:52,618] WARN Got zxid 0x200000001 expected 0x1 (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:52,619] INFO Creating new log file: log.200000001 (org.apache.zookeeper.server.persistence.FileTxnLog)
...
cloud@pubt1-lxc7-kafka2:~/kafka_2.11-1.1.0$ tailf logs/kafkaServer.out
[2019-02-27 13:21:54,892] INFO [ThrottledRequestReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2019-02-27 13:21:54,892] INFO [ThrottledRequestReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2019-02-27 13:21:54,900] INFO [KafkaServer id=2] shut down completed (kafka.server.KafkaServer)
[2019-02-27 13:21:54,901] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2019-02-27 13:21:54,903] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
[2019-02-27 13:36:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 13:46:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 13:56:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 14:06:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 14:16:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
...
出现类似上面的日志是正常的。
验证2:接下来通过本地代码验证一下:
pom依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
生产者:
package com.netease.cloud.scaffold.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerThread extends Thread{
private KafkaProducer<String, String> KafkaProducer;
private static int i = 0;
private String msg = "bb hh ";
private String key = "test key";
public KafkaProducerThread() {
Properties map = new Properties();
map.put("bootstrap.servers", "10.187.15.37:9092,10.187.15.38:9092,10.187.15.39:9092");
map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer = new KafkaProducer<>(map);
}
public void produce() {
// 生产不带key的消息
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", msg + i);
// 生产带有key的消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", key,msg + i);
i++;
try {
// 消息写入成功回显"send sucess"
KafkaProducer.send(record, (recordMetadata, e) -> System.out.println("send success"));
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
KafkaProducerThread producer = new KafkaProducerThread();
while (true) {
producer.produce();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
package com.netease.cloud.scaffold.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerThread extends Thread{
KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread() {
Properties map = new Properties();
map.put("bootstrap.servers", "10.187.15.37:9092,10.187.15.38:9092,10.187.15.39:9092");
// 设置消费者的组id
map.put("group.id", "local-test-1");
map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(map);
// 注册关注的topic
kafkaConsumer.subscribe(Collections.singleton("eagle"));
}
public void consume() {
System.out.println("wait for consume...");
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
System.out.println("partition = " + record.partition());
System.out.println("topic = " + record.topic());
System.out.println("offset = " + record.offset());
System.out.println("timestamp = " + record.timestamp());
System.out.println();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread();
kafkaConsumerThread.consume();
}
}
消费者产生如下的输出则是正常的:
key = test key
value = bb hh 10
partition = 0
topic = eagle
offset = 47
timestamp = 1551247549700
key = test key
value = bb hh 11
partition = 0
topic = eagle
offset = 48
timestamp = 1551247550700