项目环境搭建:{vmware(虚拟机网络配置),CentOS,MySql,Docker,kafka}

环境:CentOS 7.6 64bit,装在了VM虚拟机中。

00、配置网络环境(因为使用了vm虚拟机。非VM环境就无需)

#第一步:修改网络配置(使用NAT模式),打开以下网络配置文件,路径和修改内容如下:
vi /etc/sysconfig/network-scripts/ifcfg-ens33
--修改
ONBOOT=yes 
BOOTPROTO=static //静态网络IP   dhcp 动态获取网络IP
--添加
IPADDR=192.168.58.xxx
NETMASK=255.255.255.0
GATEWAY=192.168.58.xxx
DNS1=114.114.114.114
--删除
UUID
--进行网络测试
ip addr
systemctl restart network.service
ping www.baidu.com

#第二步:关闭网络防火墙:
systemctl stop firewalld (本次服务内关闭防火墙)
systemctl disable firewalld(禁用防火墙服务,服务器重启后防火墙禁用)

#第三步:关闭软件安装限制:
vi /etc/selinux/config
--修改配置
SELINUX=disabled

01、MySQL安装

、下载并安装mysql:

wget http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql-community-server
#遇到错误:
失败的软件包是:mysql-community-client-5.7.41-1.el7.x86_64
GPG  密钥配置为:file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
#依次执行以下命令
rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
yum install -y mysql-community-server

、启动并查看状态MySQL:

systemctl start  mysqld.service
systemctl status mysqld.service

、查看MySQL的默认密码:

grep "password" /var/log/mysqld.log
#如下结果,最后类似的随机字符串“3Xn1mKp.TnQ(”就是默认密码
2023-03-16T00:29:16.010934Z 1 [Note] A temporary password is generated for root@localhost: 3Xn1mKp.TnQ(

、登录进MySQL

mysql -uroot -p

、修改默认密码(设置密码需要有大小写符号组合---安全性),把下面的my passrod替换成自己的密码

ALTER USER 'root'@'localhost' IDENTIFIED BY 'my password';

#如下设置会报错,因为密码太简单;这个与验证密码策略validate_password_policy的值有关。默认是1,
#所以刚开始设置的密码必须符合长度,且必须含有数字,小写或大写字母,特殊字符。如果不想设置8位,或者想设置简单点,可以选择Policy0.
ALTER USER 'root'@'localhost' IDENTIFIED BY '123456';
Your password does not satisfy the current policy requirements

#修改validate_password_policy参数的值(等级为0)
mysql> set global validate_password_policy=0;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'Aa123456';

、开启远程访问 (把下面的my passrod替换成自己的密码)

grant all privileges on *.* to 'root'@'%' identified by 'my password' with grant option;
flush privileges;
exit

、在云服务上增加MySQL的端口:

02、Docker环境

首先我们需要安装GCC相关的环境:

yum -y install gcc

yum -y install gcc-c++

安装Docker需要的依赖软件包:

yum install -y yum-utils device-mapper-persistent-data lvm2

设置国内的镜像(提高速度)

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

更新yum软件包索引:

yum makecache fast

安装DOCKER CE(注意:Docker分为CE版和EE版,一般我们用CE版就够用了)

yum -y install docker-ce

启动Docker:

systemctl start docker

下载回来的Docker版本::

docker version

来一发HelloWorld:

docker run hello-world

03、Docker compose环境

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务.
运行以下命令以下载 Docker Compose 的当前稳定版本:

sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

将可执行权限应用于二进制文件:

sudo chmod +x /usr/local/bin/docker-compose

创建软链:

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

测试是否安装成功:

docker-compose --version

04、kafka

kafka >>> compose 文件

新建搭建kafka环境的docker-compose.yml文件,内容如下:
文件内TODO 中的ip需要改成自己的,并且如果你用的是云服务器,那需要把端口给打开。

version: '3'
services:
  zookepper:
    image: wurstmeister/zookeeper                    # 原镜像`wurstmeister/zookeeper`
    container_name: zookeeper                        # 容器名为'zookeeper'
    volumes:                                         # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    ports:                                           # 映射端口
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka                                # 原镜像`wurstmeister/kafka`
    container_name: kafka                                    # 容器名为'kafka'
    volumes:                                                 # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    environment:                                                       # 设置环境变量,相当于docker run命令中的-e
      KAFKA_BROKER_ID: 0                                               # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 将kafka的地址端口注册给zookeeper
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092                        # 配置kafka的监听端口
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181                # zookeeper地址
      KAFKA_CREATE_TOPICS: "hello_world"
    ports:                              # 映射端口
      - "9092:9092"
    depends_on:                         # 解决容器依赖启动先后问题
      - zookepper

  kafka-manager:
    image: sheepkiller/kafka-manager                         # 原镜像`sheepkiller/kafka-manager`
    container_name: kafka-manager                            # 容器名为'kafka-manager'
    environment:                        # 设置环境变量,相当于docker run命令中的-e
      ZK_HOSTS: zookeeper:2181  #  zookeeper地址
      APPLICATION_SECRET: xxxxx
      KAFKA_MANAGER_AUTH_ENABLED: "true"  # 开启kafka-manager权限校验
      KAFKA_MANAGER_USERNAME: admin       # 登陆账户
      KAFKA_MANAGER_PASSWORD: 123456      # 登陆密码
    ports:                              # 映射端口
      - "9000:9000"
    depends_on:                         # 解决容器依赖启动先后问题
      - kafka

kafka >>> 启动kafka

在存放docker-compose.yml的目录下执行启动命令:

#格式为docker-compose up [options] [SERVICE...],
#该命令可以自动完成包括构建镜像,(重新)创建服务,启动服务,并关联服务相关容器的一系列操作。
#-d表示后台执行
docker-compose up -d

可以查看下docker镜像运行的情况:

docker ps 

进入kafka 的容器:

docker exec -it kafka sh

创建一个topic(这里我的topicName就叫austin,你们可以改成自己的)

$KAFKA_HOME/bin/kafka-topics.sh --create --topic austin --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1 

查看刚创建的topic信息:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic austin

启动一个消费者:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic austin

新增一个窗口,启动一个生产者:

docker exec -it kafka sh
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=austin --broker-list kafka:9092

kafka >>> Java程序验证

引入Kafka依赖(SpringBoot有默认的版本,不需要写version)

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

yml配置文件

# 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接
spring.kafka.bootstrap-servers=xx.xx.xx.xx:9092
#如果该值大于零时,表示启用重试失败的发送次数
spring.kafka.producer.retries=3
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
spring.kafka.producer.batch-size=16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
#key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#用于标识此使用者所属的使用者组的唯一字符串。
spring.kafka.consumer.group-id=default‐group
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=false
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
spring.kafka.listener.ack-mode=manual_immediate

定义一个实体类:

@Data
@Accessors(chain = true)
public class UserLog {
    private String username;
    private String userid;
    private String state;
}

定义生产者(austin是topicName):

@Component
public class UserLogProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据   
     * @param userid
     */
    public void sendLog(String userid){
        UserLog userLog = new UserLog();
        userLog.setUsername("jhp").setUserid(userid).setState("0");
        System.err.println("发送用户日志数据:"+userLog);
        kafkaTemplate.send("austin", JSON.toJSONString(userLog));
    }
}

定义消费者:

@Component
@Slf4j
public class UserLogConsumer {
    @KafkaListener(topics = {"austin"},groupId = "austinGroup2")
    public void consumer(ConsumerRecord<?,?> consumerRecord){
        //判断是否为null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        log.error(">>>austinGroup2>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            System.err.println("消费消息:"+message);
        }

    }

    /**
     * 不指定消费组消费
     *
     * @param record
     * @param ack
     */
    @KafkaListener(topics = "austin",groupId = "austinGroup1")
    public void listenerOne(ConsumerRecord<?,?> record, Acknowledgment ack) {
        //判断是否为null
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        log.error(">>>austinGroup1>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            System.err.println("消费消息:"+message);
        }
        //手动提交offset
        ack.acknowledge();
    }
}

定义接口:

@RestController
public class KafkaTestController {

    @Autowired
    private UserLogProducer userLogProducer;

    /**
     * test insert
     */
    @GetMapping("/kafka/insert")
    public String insert(String userId) {
        userLogProducer.sendLog(userId);
        return null;
    }

}

测试:http://localhost:8080/kafka/insert?userId=33

发送用户日志数据:UserLog(username=jhp, userid=33, state=0)
消费消息:{"state":"0","userid":"33","username":"jhp"}
消费消息:{"state":"0","userid":"33","username":"jhp"}
2023-03-20 02:00:59.269 ERROR 56940 --- [ntainer#0-0-C-1] c.e.kafkademo.config.UserLogConsumer     : >>>austinGroup2>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]
2023-03-20 02:00:59.270 ERROR 56940 --- [ntainer#1-0-C-1] c.e.kafkademo.config.UserLogConsumer     : >>>austinGroup1>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]

05、Redis

Redis >>> compose 文件

安装Redis的环境跟上次Kafka是一样的,为了方便我就继续用docker-compose的方式来进行
首先,我们新建一个文件夹redis,然后在该目录下创建出data文件夹、redis.conf文件和docker-compose.yaml文件

redis.conf文件的内容如下(后面的配置可在这更改,比如requirepass 我指定的密码为123456)

protected-mode no
port 6379
timeout 0
save 900 1 
save 300 10
save 60 10000
rdbcompression yes
dbfilename dump.rdb
dir /data
appendonly yes
appendfsync everysec
requirepass 123456

docker-compose.yaml的文件内容如下:

version: '3'
services:
  redis:
    image: redis:latest
    container_name: redis
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis.conf:/usr/local/etc/redis/redis.conf:rw
      - ./data:/data:rw
    command:
      /bin/bash -c "redis-server /usr/local/etc/redis/redis.conf "

配置的工作就完了,如果是云服务器,记得开redis端口6379

Redis >>> 启动Redis

启动Redis跟之前安装Kafka的时候就差不多

# docker-compose up解决错误ERROR: Couldn't connect to Docker daemon at http+docker://localhost - is it running?  
# If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
#如果发生以上错误提示,可能是:docker服务没启动,那就启动(命令如下:sudo systemctl start docker)

docker-compose up -d

docker ps

docker exec -it redis redis-cli

进入redis客户端了之后,我们想看验证下是否正常。(在正式输入命令之前,我们需要通过密码校验,在配置文件下配置的密码是123456)

auth 123456

然后随意看看命令是不是正常

set 1 1
get 1
keys *

Redis >>> Java程序验证

在SpringBoot环境下,使用Redis就非常简单了(再次体现出使用SpringBoot的好处)。我们只需要在pom文件下引入对应的依赖,并且在配置文件下配置host/port和password就搞掂了。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容