Flume+Kafka+SparkStreaming 进行WordCounts实例

1. flume

flume的安装配置就不说了,网上一大堆。
我还是给一个网址吧,https://www.jianshu.com/p/82c77166b5a3
编写flume配置文件

cd /opt/apache-flume-1.8.0-bin
vim conf/flume_kafka_and_hdfs.conf

填写内容如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/syslogdatatest.txt
a1.sources.r1.channels = c1 c2

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/flume/flumeCheckpoint
a1.channels.c1.dataDirs = /home/flume/flumeData, /home/flume/flumeDataExt
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 2000000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://cn01:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix = cmcc
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.idleTimeout = 0

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.topic = test1
a1.sinks.k2.brokerList = 192.168.10.101:9092,192.168.10.102:9092,192.168.10.103:9092
a1.sinks.k2.requiresAcks = 1
a1.sinks.k2.batchSize = 100
a1.sinks.k2.channel = c2

之后保存退出即可

2. kafka

同样kafka 的安装配置也给一个地址,https://www.jianshu.com/p/3cb394ef41c0
kafka不需要额外的写什么,只是一个消息中间件,只要启动了kafka并且创建了topic(本文是test1,和flume配置文件里面的要相同)就好了。

3. spark

关于spark集群的搭建给一个网址https://www.jianshu.com/p/f9a9147176a7,都比较简单。
编写scala脚本

cd /opt/spark-2.2.1-bin-hadoop2.7
mkdir test #
cd test
mkdir -p src/main/scala 
vim src/main/scala/DirectKafkaWordCount.scala

填写如下代码到DirectKafkaWordCount.scala脚本里。

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.KafkaUtils._
import org.apache.spark.SparkConf
object DirectKafkaWordCount {
    def main(args: Array[String]) {
        if(args.length < 2) {
            System.err.println(s"""
                |Usage: DirectKafkaWordCount <brokers> <topics>
                |  <brokers> is a list of one or more Kafka brokers
                |  <topics> is a list of one or more kafka topics to consume from
                |
                """.stripMargin)
            System.exit(1)
        }
        //StreamingExamples.setStreamingLogLevels()
        val Array(brokers, topics) = args
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
        val lines = messages.map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

保存退出即可,在编写一个spark相关依赖的脚本。

vim build.sbt

填写如下内容即可。

name := "Simple Project With DirectKafkaWordCount"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.1"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.1"

同样的保存退出。
最后我们使用命令来编译一下。

sbt package

当然需要先安装sbt命令。网上一大堆。
他会下载一些依赖,我们等着就行了。看到最后的输出信息有success就表示编译成功了。
我们可以看到test目录下多了两个子目录,其中在target/scala-2.11目录下有一个jar包。这正是我们需要的。

4. 启动运行提交作业

先启动flume:

cd /opt/apache-flume-1.8.0-bin
bin/flume-ng agent --conf conf/ --conf-file conf/flume_kafka_and_hdfs_test.conf --name a1 -Dflume.root.logger=INFO,console

然后另外打开一个终端用来运行spark job。命令如下。

cd /opt/spark-2.2.1-bin-hadoop2.7
spark-submit --jars /home/spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar test/target/scala-2.11/simple-project-with-directkafkawordcount_2.11-1.0.jar cn01:9092,cn02:9092,cn03:9092 test1

其中--jars 后面跟的是依赖项, 我们需要先到这里找到对应自己spark版本的下载并上传到服务就可以了。
或者用--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1代替--jars参数。他会在线下载。
OK! 你就会看到程序正常运行了。
最后一步就是我们需要往/home/syslogdatatest.txt文件中写一点内容了,用来做wordCounts。
在另开一个终端。

vim /home/syslogdatatest.txt
#写一些东西
hello flume
hello kafka
hello spark
apache spark
apache kafka
apache flume

保存退出即可。
不出意外的话就立即能在刚才提交spark job的终端上看到对应的词频统计结果了。
我们可以在UI界面上看到更多的信息。


END

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容