Apache Flink 学习笔记(一)

最近在项目中需要用到Flink,关于Flink的基本介绍就不啰嗦了,官方文档传送门

由于是第一次接触,我花了一些时间整理了几个小demo(java)当作笔记。对Flink很多地方的理解有些片面甚至错误的,路过的朋友权当参考,不能保证说得都对。

之前接触过Spark的都知道,数据处理是在RDD中进行的(无论是批处理还是流处理)。Flink则不同,批处理用DataSet,流处理用DataStream,而且批处理和流处理的api也是不一样的。

先来看一下第一个demo 经典的 word count

我笔记中的例子都是基于 JDK1.8 ,Flink 1.6 编写的

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 从本地文件读取字符串,按空格分割单词,统计每个分词出现的次数并输出
 */
public class Demo1 {
    public static void main(String[] args) {
        //获取执行环境 ExecutionEnvironment (批处理用这个对象)
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        //加载数据源到 DataSet
        DataSet<String> text = env.readTextFile("test.txt");
        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        //s 即从文本中读取到一行字符串,按空格分割后得到数组tokens
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                //初始化每一个单词,保存为元祖对象
                                collector.collect(new Tuple2<String, Integer>(token, 1));
                            }
                        }
                    }
                })
                        .groupBy(0) //0表示Tuple2<String, Integer> 中的第一个元素,即分割后的单词
                        .aggregate(Aggregations.SUM, 1); //同理,1表示Tuple2<String, Integer> 中的第二个元素,即出现次数

        try {
            //从DataSet 中获得集合,并遍历
            List<Tuple2<String,Integer>> list = counts.collect();
            for (Tuple2<String,Integer> tuple2:list){
                System.out.println(tuple2.f0 + ":" + tuple2.f1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中,groupBy(0) 表示按照DataSet中保存的元祖的第一个字段分组,aggregate 是聚合函数,Aggregations.SUM 指定了求和,1 表示对元祖的第二个字段进行求和计算。

//test.txt 
hello world
flink demo
this is a flink demo file
//控制台输出
demo:2
is:1
this:1
a:1
file:1
world:1
hello:1
flink:2

可以看到,Flink程序已经成功工作了。但是有一个问题,DataSet中的对象使用元祖Tuple来保存的,如果字段比较多,肯定不如pojo 更加方便,所以第二个demo 我用pojo来改造一下。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 用pojo 改造 demo1
 */
public class Demo2 {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("test.txt");
        //用 WordWithCount 保存单词和次数信息
        DataSet<WordWithCount> counts =
                text.flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                collector.collect(new WordWithCount(token, 1));
                            }
                        }
                    }
                })
                        .groupBy("word")//直接指定字段名称
                        .reduce(new ReduceFunction<WordWithCount>() {
                            @Override
                            public WordWithCount reduce(WordWithCount wc, WordWithCount t1) throws Exception {
                                  return new WordWithCount(wc.word, wc.count + t1.count);
                            }
                        });
        try {
            List<WordWithCount> list = counts.collect();
            for (WordWithCount wc: list) {
                System.out.println(wc.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // pojo
    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

运行结果和demo1完全一致。但是你可能会注意到,demo1中的aggregate聚合函数被替换成了reduce,这是因为aggregate函数只接受int来表示filed。同时,.groupBy(0) 也相应改成用.groupBy("word")直接指定字段。

请注意,如果你的pojo demo 运行失败,你可能需要做以下检查工作:
1、pojo 有没有声明为public,如果是内部类必须是static
2、有没有为pojo创建一个无参的构造函数
3、有没有声明pojo的字段为public,或者生成publicgetset方法
4、必须使用Flink 支持的数据类型

如果你有提供publicget,set 方法,比如:

public String getWord() {
    return word;
}

public void setWord(String word) {
    this.word = word;
}

那么,.groupBy("word") 还可以用.groupBy(WordWithCount::getWord)替换

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容