KAFKA环境配置以及运行

KAFKA理解

首先下载zookeeper(cdh5.7.0版本),解压,配置环境变量。

在conf目录下,复制zoo_sample.cfg  重新命名zoo.cfg

更改dataDir目录的值,因为tmp文件夹在关机之后会被删除。

开启zookeeper:

在bin目录下

./zkServer.sh start


接下来下载KAFKA(0.9.0.0版本),解压,配置环境变量

接下来修改配置文件server.properties

log.dirs值要修改:因为tmp文件夹在关机之后会被删除。

host_name值要修改为hadoop000(自己主机的名字)

zookeeper.connect修改为hadoop000:2181

记得先改 sudo vi /etc/hosts 的ip地址信息!!!

开启KAFKA:kafka-server-start.sh $KAFKA_HOME/config/server.properties

创建topic :kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

replication-factor是副本数量,partitions 是分区数量   hello_topic是自定义topic名。

查看所有topic:kafka-topics.sh --list --zookeeper hadoop000:2181

发送消息 broker:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic


消费消息 :kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic



接下来是单节点多broker部署和使用:

复制三个server.properties...

server-1.properties要修改的地方:

    log.dirs=/home/hadoop/app/tmp/kafka-logs-1

    listeners=PLAINTEXT://:9093

    broker.id=1

server-2.properties要修改的地方:

    log.dirs=/home/hadoop/app/tmp/kafka-logs-2

    listeners=PLAINTEXT://:9094

    broker.id=2

server-3.properties要修改的地方:

    log.dirs=/home/hadoop/app/tmp/kafka-logs-3

    listeners=PLAINTEXT://:9095

    broker.id=3

    开启三个KAFKA节点:

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

创建topic:

kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

发送消息

broker:kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

接受消息:

kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic


接下来是IDEA上的KAFKA的API调用:

创建常量类:

//kafka常用配置文件

public class KafkaProperties {

public static final StringZK ="192.168.8.51";

    public static final StringTOPIC ="hello_topic";

    public static final StringBROKER_LIST ="192.168.8.51:9092";

}


KAFKAProducer类(集成Thread类)主要代码:

public KafkaProducer(String topic){

this.topic = topic;

    Properties properties =new Properties();

    properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

    properties.put("serializer.class","kafka.serializer.StringEncoder");

    properties.put("request.required.acks","1");

    producer =new Producer(new ProducerConfig(properties));

}


public void run() {

int messageNo =1;

    while(true){

String message ="message_" + messageNo;

        producer.send(new KeyedMessage(topic,message));

        System.out.println("Sent: " + message);

        messageNo++;

        try{

Thread.sleep(2000);

        }catch (Exception e){

e.printStackTrace();

        }

}

}

虚拟中先要开启zookeeper和KAFKA,然后跑KafkaProducer的run方法


结果

楼主第一次执行其实是有报错的,原因是kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

刚开始以为是网络问题,但是发现在windows上能ping通虚拟机,然后

百度了之后发现在server.properties中加入一行advertised.listeners=PLAINTEXT://192.168.8.51:9092之后就行了。


FLUME 整合KAFKA过程:

先开启zookeeper和KAFKA:

./zkServer.sh start

kafka-server-start.sh $KAFKA_HOME/config/server.properties。


修改FLUME_HOME/conf 下的avro-memory-logger.conf更名为avro-memory-kafka.conf。修改的内容如下:

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink

avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092

avro-memory-kafka.sinks.kafka-sink.topic = hello_topic

avro-memory-kafka.sinks.kafka-sink.batchSize = 5

avro-memory-kafka.sinks.kafka-sink.requiredAcks =1


开启第二个FLUME:

flume-ng agent \

--name avro-memory-kafka \

--conf $FLUME_HOME/conf \

--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \

-Dflume.root.logger=INFO,console

再开启第一个FLUME:

flume-ng agent \

--name exec-memory-avro \

--conf $FLUME_HOME/conf \

--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \

-Dflume.root.logger=INFO,console


开启消费者看是否能接收消息:


kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358