本文主要是讲述flink单机版的搭建及测试
前期准备jave要配置好,版本最好1,8以上
首先在官网下载需要的版本,网址在下面:
这次测试使用的是flink1.6.2,Hadoop2.7.2,scala2.11
https://flink.apache.org/downloads.html#all-stable-releases
下载完毕将安装包上传到linux,并安装
tar -zxvf flink-1.6.2-bin-hadoop27-scala_2.11.tgz -C /usr/local/
为了方便这里给flink目录改个名
mv flink-1.6.2 flink
进入flink目录下输入下面命令
vim conf/flink-conf.yaml
修改第33行的jobmanager.rpc.address 改成自己的主机名
同时将masters文件内容修改成主节点主机名
# > masters
# echo "spark01" >>masters
单机版现在已经搭建成功
启动在flink目录下输入下面命令
# bin/start-cluster.sh
输入后可以看到如下页面
在用jps 看一下启动的进程,红色方框里就是jobmanager进程
此时到web页面查看,在网页输入
192.168.147.133:8081回车可以看到如下界面
这样一个单机版就算成功了,下面我们来测试一下
先在远程连接工具上开启一个命令页1输入下面命令 制造数据
nc -lk 9000
再开一个命令页2运行官方案例wordcount
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
在命令页1输入任意单词,再开一个命令页3来实时查看统计结果结果保存在log下的文件里
tail -f log/flink-root-taskexecutor-spark01.out
测试完毕结束命令页1,2,3的命令,此时在web页面也可以看到任务的执行
接下来我们要在idea里有scala API来实现这个wordcount并将结果输出到控制台
现在pom.xml里添加依赖包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
然后在idea里执行下面这段scala代码
package flinktest
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
//设置连接的主机名和端口号
var hostname: String = "192.168.147.133"
var port: Int = 6666
// 获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 获取连接数据
val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// 数据处理,每5秒计算打印一次
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
// 用单个线程打印结果,而不是并行打印结果
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
/** 记录的数据类型 */
case class WordWithCount(word: String, count: Long)
}
在spark01命令页输入nc -lk 6666(代码里设置的端口)
结束进程
bin/stop-cluster.sh
相互学习,共勉!路过的大佬留个足迹