消息队列-KAFKA-单机版安装指南

前提

测试机器上需要提前实现安装好JVM,本人使用的是windows环境下利用vmare启动的Ubuntu 14.04操作系统的虚拟机,已经事先安装好了JVM 1.8。

安装实战

kafka安装包下载(注意:这里选择的安装包是2.11系列的1.1.0版本的)

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

备注:其他版本下载

安装启动服务

首先,我们需要下载并且安装zk和kafka,并且将这两个服务启动:

# 解压缩文件
tar zxvf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0/
# 启动zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 检查zk是否启动成功
netstat -tunpl|grep 2181
tcp6       0      0 :::2181                 :::*                    LISTEN      2877/java    
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties 
# 检查kafka是否启动成功
netstat -tunpl|grep 9092
tcp6       0      0 :::9092                 :::*                    LISTEN      3164/java       

命令方式验证

1. 运行 producer

打开一个窗口,输入如下的指令,启动生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2. 运行 consumer

新启动一个新的窗口,输入如下的指令,启动消费者:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

3. 测试消息传递

在producer中写信息, 从consumer中可以看到结果, 如下:

生产者端:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>i like you
[2018-12-07 01:55:56,427] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-12-07 01:55:56,532] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>hello
>world
>hello kafka
>

消费者端:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
i like you
hello
world
hello kafka

但是这是一种比较老使用zk启动消费端的方式,后序版本会废弃,官方推荐新的使用方式如下:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
i like you
hello
world
hello kafka
hello

上面的消费方式为从头开始消费的,因此之前生产者发送过的历史消息,后启动的消费者仍然可以接收到。

这里我们再次测试一下只接收新的消息。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

生产者端输入:

i like you
hello
world
hello kafka
hello
-------
louxj424 #新输入的内容

消费者端就只查看到一条消息:

louxj424

Java客户端测试代码

我们更多的时候需要在工程项目中使用kafka,下面给出一个简单的测试Demo,可以感受一下kafka的基本使用。

引入依赖的客户端jar包

首先在pom文件中引入依赖的jar包文件,注意这里一定需要安装与kafka服务端对应版本号的pom文件,必须严格对应,否则会出现各种莫名其妙的问题。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

编写生产者客户端测试Demo

package com.netease.scaffold.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class Producer {

    KafkaProducer<String, String> KafkaProducer;

    public Producer() {
        Properties map = new Properties();
        map.put("bootstrap.servers", "192.168.91.128:9092");
        map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer = new KafkaProducer<>(map);
    }

    int i = 0;
    String msg = "bb hh ";

    public void produce() {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", msg + i);
        i++;
        try {
            KafkaProducer.send(record, (recordMetadata, e) -> System.out.println("send success"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        while (true) {
            producer.produce();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

编写消费者客户端测试Demo

package com.netease.scaffold.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class Consumer {
    KafkaConsumer<String, String> kafkaConsumer;

    public Consumer() {
        Properties map = new Properties();
        //map.put("bootstrap.servers", "59.111.60.130:9092,59.111.60.126:9092,59.111.60.127:9092");
        map.put("bootstrap.servers", "192.168.91.128:9092");
        map.put("group.id", "local-test-1");
        map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumer = new KafkaConsumer<>(map);
        kafkaConsumer.subscribe(Collections.singleton("eagle"));
    }

    public void consumer() {
        System.out.println("wait for consume...");
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("key = " + record.key());
                    System.out.println("value = " + record.value());
                    System.out.println("partition = " + record.partition());
                    System.out.println("topic = " + record.topic());
                    System.out.println("offset = " + record.offset());
                    System.out.println("timestamp = " + record.timestamp());
                    System.out.println();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.consumer();
    }
}

功能测试

异常测试结果

踩坑经历:首次运行报错:

org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-0: 30040 ms has passe

使用java去连接kafka报连接超时错误。

解决方案:

修改kafka的config/server.properties文件中的如下内容:

advertised.listeners=PLAINTEXT://IP地址:9092

使用ifconfig或者ip a指令获取安装机器的ip地址,加入获取到的测试机器的IP地址为192.168.91.128,就将上述位置的配置参数修改为如下的内容:

advertised.listeners=PLAINTEXT://192.168.91.128:9092

修改完成后保存退出,并重新启动zk和kafka。

正常测试结果

再次运行程序。先启动消费者,消费者会先阻塞,之后再启动生产者。

  1. 消费者输出
key = null
value = bb hh 8
partition = 0
topic = eagle
offset = 8
timestamp = 1544183273035

key = null
value = bb hh 9
partition = 0
topic = eagle
offset = 9
timestamp = 1544183274035

key = null
value = bb hh 10
partition = 0
topic = eagle
offset = 10
timestamp = 1544183275035

key = null
value = bb hh 11
partition = 0
topic = eagle
offset = 11
timestamp = 1544183276035
  1. 生产者输出
send success
send success
send success
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,317评论 1 15
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,721评论 13 425
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,079评论 0 43
  • 文|西雅图不眠 2013年大学毕业后,我选择了留学英国读研,我很幸运的有一个能支持我留学的一个家庭,父母辛辛苦苦的...
    西雅图不眠阅读 733评论 1 3