从零开始搭建Kafka+SpringBoot分布式消息系统

前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)(ps2:没有wget的先装一下:yum install wget)(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

**文章首发公众号:Java架构师联盟,每日更新技术好文**


一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。(请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

对压缩文件进行解压:

tar -zxvf jdk-8u221-linux-x64.tar.gz

对解压后的文件夹进行改名:

mv jdk1.8.0_221 jdk1.8

3、配置环境变量

vim /etc/profile

#java environment

export JAVA_HOME=/usr/local/jdk1.8

export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar

export PATH=$PATH:${JAVA_HOME}/bin

操作之后的界面如下:

运行命令使环境生效

source /etc/profile

二、搭建zookeeper集群

1、下载zookeeper

创建zookeeper目录,在该目录下进行下载:

mkdir /usr/local/zookeeper

这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

等待下载完成之后解压:

tar -zxvf zookeeper-3.4.6.tar.gz

重命名为zookeeper1

mv zookeeper-3.4.6 zookeeper1

cp -r zookeeper1 zookeeper2

cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

在data目录下新建myid文件。内容为1

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/

cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

dataDir=/usr/local/zookeeper/zookeeper1/data

dataLogDir=/usr/local/zookeeper/zookeeper1/logs

server.1=192.168.233.11:2888:3888

server.2=192.168.233.11:2889:3889

server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

cd /usr/local/zookeeper/

cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

vim zookeeper2/data/myid

同时将myid中的值改成2

5、搭建zookeeper3

同上,复制改名

cp -r zookeeper1 zookeeper3

vim zookeeper3/conf/zoo.cfg

修改为3

vim zookeeper3/data/myid

修改为3

6、测试zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

vim start

start的内容如下

cd /usr/local/zookeeper/zookeeper1/bin/

./zkServer.sh start ../conf/zoo.cfg

cd /usr/local/zookeeper/zookeeper2/bin/

./zkServer.sh start ../conf/zoo.cfg

cd /usr/local/zookeeper/zookeeper3/bin/

./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

vim login

login内容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

sh start

sh login

启动集群成功,如下图:

这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

mkdir /usr/local/kafka

然后在该目录下载

cd /usr/local/kafka/

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties修改内容:

broker.id=0

log.dirs=/tmp/kafka-logs

listeners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

cp server.properties server2.properties

cp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要内容为:

broker.id=1

log.dirs=/tmp/kafka-logs1

listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties修改内容为:

broker.id=2

log.dirs=/tmp/kafka-logs2

listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

cd ../bin/

vim start

脚本内容为:

./kafka-server-start.sh ../config/server.properties &

./kafka-server-start.sh ../config/server2.properties &

./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

4、创建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka打印了几条日志

在启动的zookeeper中可以通过命令查询到这条topic!

ls /brokers/topics

查看kafka状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

可以看到此时有三个节点 1 , 2 , 0

Leader 是1 ,因为分区只有一个 所以在0上面,Replicas:主从备份是 1,2,0,ISR(in-sync):现在存活的信息也是 1,2,0

5、启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…

6、启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

在生产者又造了一条。

消费者自动捕获成功!

四、集成springboot

先贴一张kafka兼容性目录:

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

回归正题,搞了两个小时,终于搞好了,想哭…遇到的问题基本就是jar版本不匹配。上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.1.1.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>

    <groupId>com.gzky</groupId>

    <artifactId>study</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <name>study</name>

    <description>Demo project for Spring Boot</description>

    <properties>

        <java.version>1.8</java.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

            <exclusions>

                <exclusion>

                    <groupId>org.junit.vintage</groupId>

                    <artifactId>junit-vintage-engine</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-redis</artifactId>

            <version>1.3.8.RELEASE</version>

        </dependency>

        <dependency>

            <groupId>redis.clients</groupId>

            <artifactId>jedis</artifactId>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->

        <dependency>

            <groupId>org.springframework.kafka</groupId>

            <artifactId>spring-kafka</artifactId>

            <version>2.2.0.RELEASE</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

        </plugins>

    </build>

</project>

pom文件中,重点是下面这两个版本。

<parent>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-parent</artifactId>

      <version>2.1.1.RELEASE</version>

      <relativePath/> <!-- lookup parent from repository -->

  </parent>

<dependency>

      <groupId>org.springframework.kafka</groupId>

      <artifactId>spring-kafka</artifactId>

      <version>2.2.0.RELEASE</version>

</dependency>

2、application.yml

spring:

  redis:

    cluster:

      #设置key的生存时间,当key过期时,它会被自动删除;

      expire-seconds: 120

      #设置命令的执行时间,如果超过这个时间,则报错;

      command-timeout: 5000

      #设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;

      nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006

  kafka:

    # 指定kafka 代理地址,可以多个

    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094

    producer:

      retries: 0

      # 每次批量发送消息的数量

      batch-size: 16384

      buffer-memory: 33554432

      # 指定消息key和消息体的编解码方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:

      # 指定默认消费者group id

      group-id: test-group

      auto-offset-reset: earliest

      enable-auto-commit: true

      auto-commit-interval: 100

      # 指定消息key和消息体的编解码方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

server:

  port: 8085

  servlet:

    #context-path: /redis

    context-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图:想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》

3、生产者

package com.gzky.study.utils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

/**

* kafka生产者工具类

*

* @author biws

* @date 2019/12/17

**/

@Component

public class KfkaProducer {

    private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;

    /**

    * 生产数据

    * @param str 具体数据

    */

    public void send(String str) {

        logger.info("生产数据:" + str);

        kafkaTemplate.send("testTopic", str);

    }

}

4、消费者

package com.gzky.study.utils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

/**

* kafka消费者监听消息

*

* @author biws

* @date 2019/12/17

**/

@Component

public class KafkaConsumerListener {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "testTopic")

    public void onMessage(String str){

        //insert(str);//这里为插入数据库代码

        logger.info("监听到:" + str);

        System.out.println("监听到:" + str);

    }

}

5、对外接口

package com.gzky.study.controller;

import com.gzky.study.utils.KfkaProducer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

/**

* kafka对外接口

*

* @author biws

* @date 2019/12/17

**/

@RestController

public class KafkaController {

    @Autowired

    KfkaProducer kfkaProducer;

    /**

    * 生产消息

    * @param str

    * @return

    */

    @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)

    @ResponseBody

    public boolean sendTopic(@RequestParam String str){

        kfkaProducer.send(str);

        return true;

    }

}

6、postman测试

这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:

推荐此处重启一下集群关闭kafka命令:

cd /usr/local/kafka/kafka_2.11-1.1.0/bin

./kafka-server-stop.sh ../config/server.properties &

./kafka-server-stop.sh ../config/server2.properties &

./kafka-server-stop.sh ../config/server3.properties &

此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:

./kafka-server-start.sh ../config/server.properties &

./kafka-server-start.sh ../config/server2.properties &

./kafka-server-start.sh ../config/server3.properties &

等待kafka启动成功后,启动消费者监听端口:

cd /usr/local/kafka/kafka_2.11-1.1.0

bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic

曾经我乱输的测试信息全部被监听过来了!

启动springboot服务

然后用postman生产消息:

然后享受成果,服务器端监听成功。

项目中也监听成功!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容