Flume+Kafka+Spark Streaming整合的简单例子

一个简单的例子

  1. 输出日志
  2. flume上报日志到Kafka
  3. Spark Streaming消费Kafka, 统计词频并打印

1. 先从安装开始

我这里使用了一个空的Centos8.0镜像

1.1 安装JDK

搜索并安装1.8

# 搜索jdk
yum search java|grep jdk
# 安装jdk1.8
yum install java-1.8.0-openjdk.x86_64

配置环境变量

vim /etc/profile

文件末尾添加

# 目录在/usr/lib/jvm/xxx下
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.232.b09-0.el8_0.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=${JAVA_HOME}/bin:$PATH

添加完成后使用source生效配置

source /etc/profile

检查下

java -version

1.2 安装maven

yum install maven

检查下

mvn -version

1.3 安装Flume

下载并解压Flume
下载

cd /opt
curl -O http://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压

tar -zxvf apache-flume-1.9.0-bin.tar.gz

改名

mv apache-flume-1.9.0-bin flume

配置环境变量

vim /etc/profile

文件末尾添加

export FLUME_HOME=/opt/flume                   
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

添加完成后使用source生效配置

source /etc/profile

检查下

/opt/flume/bin/flume-ng version

# 输出
Flume 1.9.0
...

1.4 安装Kafka

下载并解压Kafka
下载

cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz

解压

tar -zxvf kafka_2.12-2.3.1.tgz

改名

mv kafka_2.12-2.3.1 kafka

测试下
起四个终端
终端1: 启动zookeeper

cd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

终端2: 启动kafka-server

cd /opt/kafka
bin/kafka-server-start.sh config/server.properties

终端3: 创建topic并生产数据

cd /opt/kafka

# 创建一个dblib的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
# 输出:
# Created topic dblab.

# 查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 输出:
# dblab

# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
# 可以输入生产数据

终端4: 消费上面的生产数据

cd /opt/kafka
# 注意:新的版本不能使用下面注释的命令
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning
# 可以看到生产消息

1.5 安装Spark

前面安装的kafka版本是kafka_2.12-2.3.1, 表示Scala版本是2.12
下载并解压Spark
下载

cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

解压

tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz

改名

mv spark-2.4.4-bin-hadoop2.7 spark

测试下

cd /opt/spark
./bin/run-example SparkPi
#./bin/spark-shell --master local[2]

2. 正式开始

2.1 定时输出日志的脚本

cd /opt
mkdir demo
cd demo
vim append-log.sh

脚本内容(每5秒输出内容到文件)

while true
do
    echo 'hello flume kafka spark_streaming redis  flume kafka spark_streaming redis' >> /opt/demo/test.log
    sleep 5
done

2.2 flume上报日志到kafka的配置文件

vim /opt/flume/conf/flume_to_kafka.conf

配置内容

a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/configure the source 
a1.sources.r1.type=exec
a1.sources.r1.command = tail -F /opt/demo/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.kafka.topic=test  
a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092  
a1.sinks.k1.kafka.producer.acks=1  
a1.sinks.k1.flumeBatchSize=5  
#Use a channel which buffers events in memory  
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=1000000
#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.3 Spark Streaming统计词频代码

生成maven项目

cd /opt/demo
mvn archetype:generate -DgroupId=com.fksr.demo -DartifactId=demo -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

pom.xml添加依赖和插件
生成项目后打开/opt/demo/demo/pom.xml添加如下依赖:
这里我发现我容器中通过spark-shell进去看到的scala版本是2.11.....这里就不管原因了, 依赖也配成2.11的

   <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.4</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.4</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.4</version>
    <scope>compile</scope>
  </dependency>

添加打包插件

<build>  
    <plugins>
      <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-assembly-plugin</artifactId>  
            <version>3.2.0</version>  
            <configuration>  
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
            </configuration>  
        </plugin> 
        ...
    </plugins>  
</build>

App.java内容
/opt/demo/demo/src/main/java/com/fksr/demo/App.java内容如下:

package com.fksr.demo;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class App 
{
    public static void main( String[] args ) throws InterruptedException
    {
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

        // topic列表
        Collection<String> topics = new HashSet<>(Arrays.asList("test".split(",")));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "test-group");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        JavaInputDStream<ConsumerRecord<String,String>> lines = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );
        // JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.value().split(" ")).iterator());
        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
    }
}

2.4 启动各种服务

启动zookeeper

cd /opt/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka服务

cd /opt/kafka
nohup bin/kafka-server-start.sh config/server.properties &

启动flume agent

cd /opt/flume
nohup bin/flume-ng agent --conf ./conf --conf-file ./conf/flume_to_kafka.conf --name a1 -Dflume.root.logger=INFO,console &

启动输出日志脚本

bash /opt/demo/append-log.sh

运行Java
两种方式:

  1. IDE中直接运行该文件
    通过VSCode的Remote Development可以开发Docker中项目
    输出内容:
(,2)
(hello,2)
(spark_streaming,4)
(kafka,4)
(flume,4)
(redis,4)
  1. 使用mvn package打包后通过spark-submit执行
    打包
mvn package assembly:single

jar的目录:/opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar
运行

cd /opt/spark
bin/spark-submit \
  --class "com.fksr.demo.App" \
  --master local[2] \
  /opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar

也会输出内容:

(,1)
(hello,1)
(spark_streaming,2)
(kafka,2)
(flume,2)
(redis,2)

参考
Flume_Kafka_SparkStreaming实现词频统计
日志采集工具Flume的安装与使用方法
Kafka的安装和简单实例测试
Spark Streaming Programming Guide
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容