一个简单的例子
- 输出日志
- flume上报日志到Kafka
- Spark Streaming消费Kafka, 统计词频并打印
1. 先从安装开始
我这里使用了一个空的Centos8.0镜像
1.1 安装JDK
搜索并安装1.8
# 搜索jdk
yum search java|grep jdk
# 安装jdk1.8
yum install java-1.8.0-openjdk.x86_64
配置环境变量
vim /etc/profile
文件末尾添加
# 目录在/usr/lib/jvm/xxx下
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.232.b09-0.el8_0.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=${JAVA_HOME}/bin:$PATH
添加完成后使用source生效配置
source /etc/profile
检查下
java -version
1.2 安装maven
yum install maven
检查下
mvn -version
1.3 安装Flume
下载并解压Flume
下载
cd /opt
curl -O http://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz
改名
mv apache-flume-1.9.0-bin flume
配置环境变量
vim /etc/profile
文件末尾添加
export FLUME_HOME=/opt/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
添加完成后使用source生效配置
source /etc/profile
检查下
/opt/flume/bin/flume-ng version
# 输出
Flume 1.9.0
...
1.4 安装Kafka
下载并解压Kafka
下载
cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz
解压
tar -zxvf kafka_2.12-2.3.1.tgz
改名
mv kafka_2.12-2.3.1 kafka
测试下
起四个终端
终端1: 启动zookeeper
cd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
终端2: 启动kafka-server
cd /opt/kafka
bin/kafka-server-start.sh config/server.properties
终端3: 创建topic并生产数据
cd /opt/kafka
# 创建一个dblib的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
# 输出:
# Created topic dblab.
# 查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 输出:
# dblab
# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
# 可以输入生产数据
终端4: 消费上面的生产数据
cd /opt/kafka
# 注意:新的版本不能使用下面注释的命令
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning
# 可以看到生产消息
1.5 安装Spark
前面安装的kafka版本是kafka_2.12-2.3.1
, 表示Scala版本是2.12
下载并解压Spark
下载
cd /opt
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
解压
tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz
改名
mv spark-2.4.4-bin-hadoop2.7 spark
测试下
cd /opt/spark
./bin/run-example SparkPi
#./bin/spark-shell --master local[2]
2. 正式开始
2.1 定时输出日志的脚本
cd /opt
mkdir demo
cd demo
vim append-log.sh
脚本内容(每5秒输出内容到文件)
while true
do
echo 'hello flume kafka spark_streaming redis flume kafka spark_streaming redis' >> /opt/demo/test.log
sleep 5
done
2.2 flume上报日志到kafka的配置文件
vim /opt/flume/conf/flume_to_kafka.conf
配置内容
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/configure the source
a1.sources.r1.type=exec
a1.sources.r1.command = tail -F /opt/demo/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=test
a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.flumeBatchSize=5
#Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=1000000
#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
2.3 Spark Streaming统计词频代码
生成maven项目
cd /opt/demo
mvn archetype:generate -DgroupId=com.fksr.demo -DartifactId=demo -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
pom.xml添加依赖和插件
生成项目后打开/opt/demo/demo/pom.xml添加如下依赖:
这里我发现我容器中通过spark-shell
进去看到的scala版本是2.11.....这里就不管原因了, 依赖也配成2.11的
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.4</version>
<scope>compile</scope>
</dependency>
添加打包插件
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
...
</plugins>
</build>
App.java内容
/opt/demo/demo/src/main/java/com/fksr/demo/App.java内容如下:
package com.fksr.demo;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class App
{
public static void main( String[] args ) throws InterruptedException
{
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// topic列表
Collection<String> topics = new HashSet<>(Arrays.asList("test".split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "test-group");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<String,String>> lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.value().split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
2.4 启动各种服务
启动zookeeper
cd /opt/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka服务
cd /opt/kafka
nohup bin/kafka-server-start.sh config/server.properties &
启动flume agent
cd /opt/flume
nohup bin/flume-ng agent --conf ./conf --conf-file ./conf/flume_to_kafka.conf --name a1 -Dflume.root.logger=INFO,console &
启动输出日志脚本
bash /opt/demo/append-log.sh
运行Java
两种方式:
- IDE中直接运行该文件
通过VSCode的Remote Development可以开发Docker中项目
输出内容:
(,2)
(hello,2)
(spark_streaming,4)
(kafka,4)
(flume,4)
(redis,4)
- 使用
mvn package
打包后通过spark-submit执行
打包
mvn package assembly:single
jar的目录:/opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar
运行
cd /opt/spark
bin/spark-submit \
--class "com.fksr.demo.App" \
--master local[2] \
/opt/demo/demo/target/demo-1.0-SNAPSHOT-jar-with-dependencies.jar
也会输出内容:
(,1)
(hello,1)
(spark_streaming,2)
(kafka,2)
(flume,2)
(redis,2)
参考
Flume_Kafka_SparkStreaming实现词频统计
日志采集工具Flume的安装与使用方法
Kafka的安装和简单实例测试
Spark Streaming Programming Guide
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)