大数据--Flink--流处理(二)

一、flink单节点安装部署

下载

1)下载安装包

[root@localhost ~]# wget http://us.mirrors.quenda.co/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz

2)解压

[root@localhost ~]# tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /usr/local

3)启动

# 切换目录
[root@localhost ~]#  cd /usr/local/flink-1.9.1/
# 启动
[root@localhost flink-1.9.1]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.

5)停止

[root@localhost flink-1.9.1]# ./bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 72657) on host localhost.localdomain.
Stopping standalonesession daemon (pid: 72220) on host localhost.localdomain.

6)访问ui界面
http://192.168.169.128:8081/


访问ui界面

二、flink集群安装部署

集群规划:

主机名 IP地址 角色
Master 192.168.56.101 jobmanager
slave1 192.168.56.102 TaskManager
slave2 192.168.56.103 TaskManager
  1. 安装同单机。
  2. 配置
    修改主机名:(所有主机)
192.168.56.101 master
192.168.56.102 slave1
192.168.56.103 slave2

Master:

修改配置文件
[root@master ~]# vi /usr/local/flink-1.9.1/conf/flink-conf.yaml 
# 33行
jobmanager.rpc.address: master

[root@master ~]# vi /usr/local/flink-1.9.1/conf/masters 
# 修改master文件
master:8081

2)修改slaves

[root@master ~]# vi /usr/local/flink-1.9.1/conf/slaves 
slave1
slave2

  1. 分发flink到其他机器
[root@localhost ~]# scp -r /usr/local/flink-1.9.1/ root@192.168.56.102:/usr/local/
The authenticity of host '192.168.56.101 (192.168.56.102)' can't be established.
ECDSA key fingerprint is SHA256:LlxhKBx6zi06K0chAZjVk+ybYoHCn6yi45RGMn6zGPY.
ECDSA key fingerprint is MD5:64:09:e3:c3:5f:3b:b6:f5:01:73:a8:83:6f:e6:bf:d6.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.56.102' (ECDSA) to the list of known hosts.
root@192.168.56.102's password: 
LICENSE   

[root@localhost ~]# scp -r /usr/local/flink-1.9.1/ root@192.168.56.103:/usr/local/
The authenticity of host '192.168.56.102 (192.168.56.103)' can't be established.
ECDSA key fingerprint is SHA256:PsNlQXJhfQm/DIC0DsYoXpwInVowEwBeUKmVeuJ5RXg.
ECDSA key fingerprint is MD5:ba:8b:33:39:d6:44:79:b0:e2:99:bc:fc:89:57:44:83.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.56.103' (ECDSA) to the list of known hosts.
root@192.168.56.10e's password: 
LICENSE 

  1. 启动集群
[root@master ~]# cd /usr/local/flink-1.9.1/
[root@master flink-1.9.1]# ./bin/start-cluster.sh
Starting cluster.
[INFO] 1 instance(s) of standalonesession are already running on master.
Starting standalonesession daemon on host master.
root@slave1's password: 
Starting taskexecutor daemon on host slave1.
root@slave2's password: 
Starting taskexecutor daemon on host slave2.

  1. 关闭集群
[root@master ~]# cd /usr/local/flink-1.9.1/
[root@master flink-1.9.1]# ./bin/stop-cluster.sh

  1. 访问ui界面
    http://192.168.56.101:8081/


    节点查看

    工作管理

    切换风格

    工作管理

三、运行自带示例

文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/batch/examples.html#running-an-example
运行示例:

./bin/flink run ./examples/batch/WordCount.jar
  1. 批处理示例
    源码:https://github.com/apache/flink/tree/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount
[root@master flink-1.9.1]# ./bin/flink run examples/batch/WordCount.jar
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
Program execution finished
Job with JobID 47566b035440a9b1789e4cc41652f3f2 has finished.
Job Runtime: 5289 ms
Accumulator Results: 
- a20407c3976a9447effcaeb4c8f99b4a (java.util.ArrayList) [170 elements]

完成任务

完成任务
  1. 流处理示例
    源码:https://github.com/apache/flink/tree/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples
//  安装netcat
[root@master flink-1.9.1]# yum install -y nc
# 启动nc服务器
[root@master ~]# nc -l 9000

# 提交flink的批处理examples程序
[root@master flink-1.9.1]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname 192.168.56.101 --port 9000
Starting execution of program

# 这是flink提供的examples下的流处理例子程序,接收socket数据传入,统计单词个数。
# 在nc端写入单词:
[root@master ~]# nc -l 9000
hello world
how are you
are you ok

查看结果:

[root@master flink-1.9.1]# cat log/flink-root-taskexecutor-0-slave2.out 
hello : 1
world : 1
how : 1
you : 1
are : 1
are : 1
ok : 1
you : 1

三、WordCount简单实现
需求:实时的wordcount
往端口中发送数据,实时的计算数据

  1. 创建一个Maven项目


    创建一个Maven项目

2.Maven依赖

<flink.version>1.9.1</flink.version>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.10</version>
    <scope>provided</scope>
</dependency>
<!--flink-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-wikiedits_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--log4j-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.29</version>
</dependency>

生成可执行jar插件(这里没有用插件生成,使用时可以不加):
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>2.2.1</version>
    <configuration>
        <archive>
            <manifest>
                <mainClass>com.xtsz.SocketWordCount</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>
  1. CustomWordCount .java类
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 需求:实时的wordcount
 * 往端口中发送数据,实时的计算数据
 */
public class CustomWordCount {

    public static void main(String[] args) throws Exception {
        // 1.定义socket的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }
        //2.创建执行环境对象
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.得到套接字对象(指定:主机、端口、分隔符),连接socket获取输入的数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.56.101", port, "\n");

        //4.解析数据,统计数据-单词计数 hello lz hello world
        DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()  {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        })
                // 按照key进行分组,把每行的单词转为<word,count>类型的数据
                .keyBy(0)  // 相同的单词进行分组
                // 设置窗口的时间长度 5秒一次窗口 1秒计算一次
                .timeWindow(Time.seconds(5), Time.seconds(1))   // 指定计算数据的窗口大小和滑动窗口大小
                .sum(1);

        // 5.打印可以设置并发度
        dataStream.print().setParallelism(1);

        //6.执行程序 因为flink是懒加载的,所以必须调用execute方法
        env.execute("streaming word count");
    }
}

  1. 服务器安装netcat
//  安装netcat
[root@master flink-1.9.1]# yum install -y nc

// 使用nc,其中9000是CustomWordCount类中定义的端口号
[root@master flink-1.9.1]# nc -lk -p 9000

开启IP为192.168.56.101的虚拟机,并开启该虚拟机的终端,在终端输入如下命令,该命令可以打开一个端口号为9000的监听,输入命令后光标会停留在如下图的地方。


端口监听
  1. 运行CustomWordCount类的main方法


    运行CustomWordCount类
运行CustomWordCount类
  1. 此时在服务器的nc下输入单词后,CustomWordCount的main方法会时时监控到该单词并进行计算处理。
    在虚拟机终端开的光标停留出,输入
hello hello world world world world

然后回车。在IDEA的控制台会显示如下单词和词频的信息,表示成功。


输入内容
查看结果

四、打包上传实现

  1. 打包


    打包

    打包

    打包

    打包

    打包
生成jar包

生成jar包
生成结果
  1. 上传到服务器运行


    上传到服务器

    上传成功
  2. 开启服务监听

// 使用nc,其中9000是CustomWordCount类中定义的端口号
[root@master flink-1.9.1]# nc -lk -p 9000
开启服务监听
  1. 提交作业


    提交作业

    查看作业

    作业运行中
  2. 输入数据

[root@master flink-1.9.1]#  nc -lk -p 9000
hello
world
hello
world
hello
hello
hello

  1. 查看结果


    查看输出

    注:因为是分布式集群,这里查看的是master节点数据,输出有可能在其它节点,这里在slave1上。

[root@slave1 flink-1.9.1]# tail -f  log/*-taskexecutor-*.out
(world,1)
(hello,1)
(hello,1)
(hello,2)
(hello,2)
(hello,3)
(hello,2)
(hello,2)
(hello,1)
(hello,1)

五、常见问题

  1. 解决"-bash: netstat: command not found"问题
[root@master flink-1.9.1]# yum install net-tools

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容