flink架构师2-进阶开发

一 、flink 进阶开发目标 0~ 1:52

  1. 掌握常见的DataStream常见的source
  2. 掌握常见的DataStream的transformation操作
  3. 掌握常见的DataStream的sink操作

1、Flink之数据源(DataStream)0:20~0:45

1 source简介
source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)
来为你的程序添加一个source。
flink提供了大量的已经实现好的source方法,也可以自定义source:

  1. 通过实现sourceFunction接口来自定义无并行度的source
  2. 通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的
    source
    大多数情况下,我们使用自带的source即可。

自定义单线程数据源

public class MyNoParalleSource implements SourceFunction<Long>

自定义多线程数据源

public class MyParalleSource implements ParallelSourceFunction<Long>

2 常见Transformation操作 0:45~1:02

2.1 map和filter

2.2 flatMap,keyBy和sum

/**
* 滑动窗口实现单词计数
* 数据源:socket
* 需求:每隔1秒计算最近2秒单词出现的次数
*
* 练习算子:
* flatMap
* keyBy:
* dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key
* dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key
* sum
*/
public class WindowWordCountJava {
public static void main(String[] args) throws Exception {
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("no port set,user default port 9988");
port=9988;
}
//步骤一:获取flink运行环境(stream)
StreamExecutionEnvironment env=
StreamExecutionEnvironment.getExecutionEnvironment();
String hostname="10.126.88.226";
String delimiter="\n";
//步骤二:获取数据源
DataStreamSource<String> textStream = env.socketTextStream(hostname,
port, delimiter);
//步骤三:执行transformation操作
SingleOutputStreamOperator<WordCount> wordCountStream =
textStream.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String line, Collector<WordCount> out) throws
Exception {
String[] fields = line.split("\t");
for (String word : fields) {
out.collect(new WordCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//每隔1秒计算最近2秒
.sum("count");
wordCountStream.print().setParallelism(1);//打印并设置并行度
//步骤四:运行程序
env.execute("socket word count");
}

3 常见Sink操作 1:02~1:06

4 DataSet算子操作 1:07~1:40

.####4.1 source
基于文件
readTextFile(path)
基于集合
fromCollection(Collection)

4.4.2 transform 1:09~1:30

算子概览
Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
FlatMap:输入一个元素,可以返回零个,一个或者多个元素
MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源
链接,建议使用MapPartition】
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregate:sum、max、min等
Distinct:返回一个数据集中去重之后的元素,data.distinct()
Join:内连接
OuterJoin:外链接
Cross:获取两个数据集的笛卡尔积
Union:返回两个数据集的总和,数据类型需要一致
First-n:获取集合中的前N个元素
Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个段的排序

4.4.4 Flink之广播变量 1:30 ~1:37

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)

用法

1:初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
withBroadcastSet(toBroadcast, "broadcastSetName");
3:获取数据
Collection<Integer> broadcastSet =
getRuntimeContext().getBroadcastVariable("broadcastSetName");

注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new
MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value)
throws Exception {
HashMap<String, Integer> res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
//源数据
DataSource<String> data = env.fromElements("zs", "ls", "ww");
//注意:在这里需要使用到RichMapFunction获取广播变量
DataSet<String> result = data.map(new RichMapFunction<String, String>()
{
List<HashMap<String, Integer>> broadCastMap = new
ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap = new HashMap<String, Integer>();
/**

  • 这个方法只会执行一次
  • 可以在这里实现一些初始化的功能
  • 所以,就可以在open方法中获取广播变量数据
    */
    @Override
    public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    //3:获取广播数据
    this.broadCastMap =
    getRuntimeContext().getBroadcastVariable("broadCastMapName");
    for (HashMap map : broadCastMap) {
    allMap.putAll(map);
    }
    }
    @Override
    public String map(String value) throws Exception {
    Integer age = allMap.get(value);
    return value + "," + age;
    }
    }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操

    result.print();
    }
    }

4.4.5 Flink之Counter(计数器)1:37~ 1:40

Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化,可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Counter是一个具体的累加器(Accumulator)实现
IntCounter, LongCounter 和 DoubleCounter

用法
1:创建累加器
private IntCounter numLines = new IntCounter();

二 、flink 高级开发(state 和window)目标 1:52~

2.1Keyed state 1:52~

Keyed state托管状态有六种类型:

  1. ValueState
  2. ListState
  3. MapState
  4. ReducingState
  5. AggregatingState
  6. FoldingState

2.1.1 ValueState 开发 1:52~2:13

2.1.2 ListState 开发 2:13~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357