参考:
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html
-
Logger-->Flume
1/配置Flume配置文件streaming.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
2/Java程序的日志配置文件
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
3/启动flume-ng
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
4/在flume-ng
窗口可以即时看到日志的产生
-
Logger-->Flume-->Kafka
1/启动kafka
,并创建topic
./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic
2/配置Flume配置文件streaming2.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
agent1.sinks.kafka-sink.brokerList = hadoop:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
3/启动日志生产程序,产生的日志即时的在kafka-console-consumer
窗口产生
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic
-
Logger-->Flume-->Kafka-->Spark Streaming
1/Java代码:
object FlumeKafkaReceiverWordCount {
def main(args: Array[String]): Unit = {
if(args.length < 4) {
//Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
//val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
2/启动上面的程序,即可在Console窗口
实时看到单词基数
3/注意:
在本地进行测试,
在IDEA中运行LoggerGenerator
,
然后使用Flume、Kafka以及Spark Streaming
进行处理操作。
在生产环境上,
1.打包jar,执行LoggerGenerator类
2.Flume、Kafka
和本地测试步骤是一样的
3.Spark Streaming
的代码也是需要打成jar包,然后使用spark-submit
的方式进行提交到环境上执行
4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos
5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性