Flume+Kafka+Spark Streaming整合的简单例子

一个简单的例子

  1. 输出日志
  2. flume上报日志到Kafka
  3. Spark Streaming消费Kafka, 统计词频并打印

1. 先从安装开始

我这里使用了一个空的Centos8.0镜像

1.1 安装JDK

搜索并安装1.8

# 搜索jdk
yum search java|grep jdk
# 安装jdk1.8
yum install java-1.8.0-openjdk.x86_64

配置环境变量

vim /etc/profile

文件末尾添加

# 目录在/usr/lib/jvm/xxx下
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.232.b09-0.el8_0.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=${JAVA_HOME}/bin:$PATH

添加完成后使用source生效配置

source /etc/profile

检查下

java -version

1.2 安装maven

yum install maven

检查下

mvn -version

1.3 安装Flume

下载并解压Flume
下载

cd /opt
curl -O http://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压

tar -zxvf apache-flume-1.9.0-bin.tar.gz

改名

mv apache-flume-1.9.0-bin flume

配置环境变量

vim /etc/profile

文件末尾添加

export FLUME_HOME=/opt/flume                   
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

添加完成后使用source生效配置

source /etc/profile

检查下

/opt/flume/bin/flume-ng version

# 输出
Flume 1.9.0
...

1.4 安装Kafka

下载并解压Kafka
下载

cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz

解压

tar -zxvf kafka_2.12-2.3.1.tgz

改名

mv kafka_2.12-2.3.1 kafka

测试下
起四个终端
终端1: 启动zookeeper

cd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

终端2: 启动kafka-server

cd /opt/kafka
bin/kafka-server-start.sh config/server.properties

终端3: 创建topic并生产数据

cd /opt/kafka

# 创建一个dblib的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
# 输出:
# Created topic dblab.

# 查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 输出:
# dblab

# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
# 可以输入生产数据

终端4: 消费上面的生产数据

cd /opt/kafka
# 注意:新的版本不能使用下面注释的命令
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning
# 可以看到生产消息

1.5 安装Spark

前面安装的kafka版本是kafka_2.12-2.3.1, 表示Scala版本是2.12
下载并解压Spark
下载

cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

解压

tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz

改名

mv spark-2.4.4-bin-hadoop2.7 spark

测试下

cd /opt/spark
./bin/run-example SparkPi
#./bin/spark-shell --master local[2]

2. 正式开始

2.1 定时输出日志的脚本

cd /opt
mkdir demo
cd demo
vim append-log.sh

脚本内容(每5秒输出内容到文件)

while true
do
    echo 'hello flume kafka spark_streaming redis  flume kafka spark_streaming redis' >> /opt/demo/test.log
    sleep 5
done

2.2 flume上报日志到kafka的配置文件

vim /opt/flume/conf/flume_to_kafka.conf

配置内容

a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/configure the source 
a1.sources.r1.type=exec
a1.sources.r1.command = tail -F /opt/demo/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.kafka.topic=test  
a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092  
a1.sinks.k1.kafka.producer.acks=1  
a1.sinks.k1.flumeBatchSize=5  
#Use a channel which buffers events in memory  
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=1000000
#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.3 Spark Streaming统计词频代码

生成maven项目

cd /opt/demo
mvn archetype:generate -DgroupId=com.fksr.demo -DartifactId=demo -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

pom.xml添加依赖和插件
生成项目后打开/opt/demo/demo/pom.xml添加如下依赖:
这里我发现我容器中通过spark-shell进去看到的scala版本是2.11.....这里就不管原因了, 依赖也配成2.11的

   <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.4</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.4</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.4</version>
    <scope>compile</scope>
  </dependency>

添加打包插件

<build>  
    <plugins>
      <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-assembly-plugin</artifactId>  
            <version>3.2.0</version>  
            <configuration>  
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
            </configuration>  
        </plugin> 
        ...
    </plugins>  
</build>

App.java内容
/opt/demo/demo/src/main/java/com/fksr/demo/App.java内容如下:

package com.fksr.demo;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class App 
{
    public static void main( String[] args ) throws InterruptedException
    {
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

        // topic列表
        Collection<String> topics = new HashSet<>(Arrays.asList("test".split(",")));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "test-group");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        JavaInputDStream<ConsumerRecord<String,String>> lines = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );
        // JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.value().split(" ")).iterator());
        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
    }
}

2.4 启动各种服务

启动zookeeper

cd /opt/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka服务

cd /opt/kafka
nohup bin/kafka-server-start.sh config/server.properties &

启动flume agent

cd /opt/flume
nohup bin/flume-ng agent --conf ./conf --conf-file ./conf/flume_to_kafka.conf --name a1 -Dflume.root.logger=INFO,console &

启动输出日志脚本

bash /opt/demo/append-log.sh

运行Java
两种方式:

  1. IDE中直接运行该文件
    通过VSCode的Remote Development可以开发Docker中项目
    输出内容:
(,2)
(hello,2)
(spark_streaming,4)
(kafka,4)
(flume,4)
(redis,4)
  1. 使用mvn package打包后通过spark-submit执行
    打包
mvn package assembly:single

jar的目录:/opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar
运行

cd /opt/spark
bin/spark-submit \
  --class "com.fksr.demo.App" \
  --master local[2] \
  /opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar

也会输出内容:

(,1)
(hello,1)
(spark_streaming,2)
(kafka,2)
(flume,2)
(redis,2)

参考
Flume_Kafka_SparkStreaming实现词频统计
日志采集工具Flume的安装与使用方法
Kafka的安装和简单实例测试
Spark Streaming Programming Guide
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容