消息队列-KAFKA-集群版安装指南

服务器配置说明

生产环境中的KAFKA多以集群模式部署,以实现高可用和横向扩展。下面介绍部署配置的硬件资源信息:

  • 3台LXC的机器,
  • 内存大小至少为8G,
  • 操作系统为debian 8/9,
  • 安装好JDK 8。

下面列举的是服务器的IP地址:

服务器编号 内网地址 外网地址
1 10.187.3.37 10.187.15.37
2 10.187.3.38 10.187.15.38
3 10.187.3.39 10.187.15.39

KAFKA搭建

kafka安装包下载(注意:这里选择的安装包是2.11系列的1.1.0版本的)

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

备注:

  1. 如果网络访问较慢,或者无法联通公网,建议通过其他网络下载并且上传到需要搭建应用的服务器上。

安装启动服务

首先,我们需要下载并且安装zk和kafka,并且将这两个服务启动:

# 解压缩文件
tar zxvf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0/
# 修改机器的config/zookeeper.properties,增加如下内容:
tickTime=2000
initLimit=10
syncLimit=5

# 下面地址为各个服务器的机房网地址
server.1=10.187.3.37:2889:3889
server.2=10.187.3.38:2889:3889
server.3=10.187.3.39:2889:3889

# 新建文件myid
mkdir /tmp/zookeeper/
touch /tmp/zookeeper/myid
echo 1 >> /tmp/zookeeper/myid

# 下面以1号机器为例进行说明
# 修改1号机器的config/server.properties,修改如下内容:
broker.id=1

# 修改服务监听地址
listeners=PLAINTEXT://0.0.0.0:9092

# 添加1机器的外网IP
advertised.listeners=PLAINTEXT://10.180.156.49:9092

# 分区数修改为固定值2
num.partitions=2

# 添加zk地址,ip为内网地址,用于加速访问
zookeeper.connect=10.187.3.37:2181,10.187.3.38:2181,10.187.3.39:2181

# 偏移量全部修改为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

 log.dirs=/home/cloud/kafka-logs

# 修改zookeeper的JVM启动参数:将-Xmx和-Xms统一修改为4G
vim bin/zookeeper-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

# 修改kafka的JVM启动参数:将-Xmx和-Xms统一修改为4G
vim bin/zookeeper-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

# 启动zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 检查zk是否启动成功
netstat -tunpl|grep 2181
tcp6       0      0 :::2181                 :::*                    LISTEN      2877/java    
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties 
# 检查kafka是否启动成功
netstat -tunpl|grep 9092
tcp6       0      0 :::9092                 :::*                    LISTEN      3164/java       

验证

验证1:服务日志观察

观察zk和kafka的日志是否有异常:

cloud@pubt1-lxc7-kafka2:~/kafka_2.11-1.1.0$ tailf logs/zookeeper.out
[2019-02-27 13:21:41,389] INFO Server environment:os.version=3.16.0-4-amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.name=cloud (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.home=/home/cloud (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,389] INFO Server environment:user.dir=/home/cloud/kafka_2.11-1.1.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,391] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-02-27 13:21:41,392] INFO FOLLOWING - LEADER ELECTION TOOK - 28 (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:41,394] INFO Resolved hostname: 10.187.3.39 to address: /10.187.3.39 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2019-02-27 13:21:41,403] INFO Getting a diff from the leader 0x10000004e (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:52,618] WARN Got zxid 0x200000001 expected 0x1 (org.apache.zookeeper.server.quorum.Learner)
[2019-02-27 13:21:52,619] INFO Creating new log file: log.200000001 (org.apache.zookeeper.server.persistence.FileTxnLog)
...



cloud@pubt1-lxc7-kafka2:~/kafka_2.11-1.1.0$ tailf logs/kafkaServer.out
[2019-02-27 13:21:54,892] INFO [ThrottledRequestReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2019-02-27 13:21:54,892] INFO [ThrottledRequestReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2019-02-27 13:21:54,900] INFO [KafkaServer id=2] shut down completed (kafka.server.KafkaServer)
[2019-02-27 13:21:54,901] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2019-02-27 13:21:54,903] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
[2019-02-27 13:36:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 13:46:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 13:56:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 14:06:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-27 14:16:40,253] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
...

出现类似上面的日志是正常的。

验证2:接下来通过本地代码验证一下:

pom依赖:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>1.1.0</version>
</dependency>

生产者:

package com.netease.cloud.scaffold.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerThread extends Thread{

    private KafkaProducer<String, String> KafkaProducer;

    private static int i = 0;

    private String msg = "bb hh ";

    private String key = "test key";

    public KafkaProducerThread() {
        Properties map = new Properties();
        map.put("bootstrap.servers", "10.187.15.37:9092,10.187.15.38:9092,10.187.15.39:9092");
        map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer = new KafkaProducer<>(map);
    }

    public void produce() {
        // 生产不带key的消息
//      ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", msg + i);

        // 生产带有key的消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", key,msg + i);

        i++;
        try {
            // 消息写入成功回显"send sucess"
            KafkaProducer.send(record, (recordMetadata, e) -> System.out.println("send success"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        KafkaProducerThread producer = new KafkaProducerThread();
        while (true) {
            producer.produce();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

package com.netease.cloud.scaffold.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerThread extends Thread{
    KafkaConsumer<String, String> kafkaConsumer;

    public KafkaConsumerThread() {
        Properties map = new Properties();
        map.put("bootstrap.servers", "10.187.15.37:9092,10.187.15.38:9092,10.187.15.39:9092");
        // 设置消费者的组id
        map.put("group.id", "local-test-1");
        map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumer = new KafkaConsumer<>(map);
        // 注册关注的topic
        kafkaConsumer.subscribe(Collections.singleton("eagle"));
    }

    public void consume() {
        System.out.println("wait for consume...");
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("key = " + record.key());
                    System.out.println("value = " + record.value());
                    System.out.println("partition = " + record.partition());
                    System.out.println("topic = " + record.topic());
                    System.out.println("offset = " + record.offset());
                    System.out.println("timestamp = " + record.timestamp());
                    System.out.println();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread();
        kafkaConsumerThread.consume();
    }
}

消费者产生如下的输出则是正常的:

key = test key
value = bb hh 10
partition = 0
topic = eagle
offset = 47
timestamp = 1551247549700

key = test key
value = bb hh 11
partition = 0
topic = eagle
offset = 48
timestamp = 1551247550700
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容