一、Environment
1.getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
2.createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
3.createRemoteEnvironment
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
二、Source
创建一个kafka的工具类
object MyKafkaUtil {
val prop = new Properties()
prop.setProperty("bootstrap.servers","hadoop1:9092")
prop.setProperty("group.id","test")
def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
myKafkaConsumer
}
}
消费kafka
object StartupApp {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer =MyKafkaUtil.getConsumer("GMALL_STARTUP")
val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
dstream.print()
environment.execute()
}
}
Exactly-once two-phase commit
Flink通过checkpoint来保存数据是否处理完成的状态
由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。
三、Transform
1.KeyBy和Reduce
spark中的reduceByKey在Flink中被分成两个算子:KeyBy和Reduce
KeyBy:
DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的,KeyedStream是有状态的。
Reduce:
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
2.Split 和 Select
Split:
类似于Flume中的选择器,在一个DataStream头部加上不同的戳拆分成多个DataStream。
Select:
在splitStream中获取一个或多个DataStream。
val splitStream: SplitStream[StartUpLog] = startUplogDstream.split { startUplog =>
var flags:List[String] = null
if ("appstore" == startUplog.ch) {
flags = List(startUplog.ch)
} else {
flags = List("other" )
}
flags
}
val appStoreStream: DataStream[StartUpLog] = splitStream.select("appstore")
val otherStream: DataStream[StartUpLog] = splitStream.select("other")
3.Connect和 CoMap
Connect:
连接两个数据流
CoMap:
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
val allStream: DataStream[String] = connStream.map(
(log1: StartUpLog) => log1.ch,
(log2: StartUpLog) => log2.ch
)
4.Union
对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
Connect与 Union 区别:
1.Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2.Connect只能操作两个流,Union可以操作多个.
Sink
1.Kafka
在kafka工具类中添加方法
def getProducer(topic:String): FlinkKafkaProducer011[String] ={
new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())
}
主函数中添加
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
2.Redis
在Redis工具类中添加方法
def getRedisSink(): RedisSink[(String,String)] ={
new RedisSink[(String,String)](conf,new MyRedisMapper)
}
class MyRedisMapper extends RedisMapper[(String,String)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "channel_count")
// new RedisCommandDescription(RedisCommand.SET )
}
override def getValueFromData(t: (String, String)): String = t._2
override def getKeyFromData(t: (String, String)): String = t._1
}
3.Elasticsearch
def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={
val esFunc = new ElasticsearchSinkFunction[String] {
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("试图保存:"+element)
val jsonObj: JSONObject = JSON.parseObject(element)
val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
indexer.add(indexRequest)
println("保存1条")
}
}
val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)
//刷新前缓冲的最大动作量
sinkBuilder.setBulkFlushMaxActions(10)
sinkBuilder.build()
}
4.JDBC 自定义sink
略略略