flink不仅可以支持实时流式处理,它也可以支持批量处理,其中批量处理也可以看作是实时处理的一个特殊情况
1、 dataSet的内置数据源
基于文件数据源:
readTextFile(path) / TextInputFormat:逐行读取文件并将其作为字符串(String)返回
readTextFileWithValue(path) / TextValueInputFormat:逐行读取文件并将其作为StringValue返回。StringValue是Flink对String的封装,可变、可序列化,一定程度上提高性能。
readCsvFile(path) / CsvInputFormat:解析以逗号(或其他字符)分隔字段的文件。返回元组或pojo
readFileOfPrimitives(path, Class) / PrimitiveInputFormat
readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat 跟readCsvFile类似,只不过以原生类型返回而不是Tuple。
readSequenceFile(Key, Value, path) / SequenceFileInputFormat:读取SequenceFile,以Tuple2<Key, Value>返回
基于集合数据源:
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
通用数据源:
readFile(inputFormat, path) / FileInputFormat
createInput(inputFormat) / InputFormat
文件数据源
入门案例就是基于文件数据源,如果需要对文件夹进行递归,那么我们也可以使用参数来对文件夹下面的多级文件夹进行递归
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment} object BatchOperate { def main(args: Array[String]): Unit = { val inputPath = "D:****\****count.txt" val outPut = "D:****\****data****\****result2"
val configuration: Configuration = new Configuration()
configuration.setBoolean("recursive.file.enumeration",true)
//获取程序入口类ExecutionEnvironment val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(inputPath) .withParameters(configuration)
//引入隐式转换import org.apache.flink.api.scala._ val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:****\****datas****\****result.txt").setParallelism(1)
env.execute("batch word count")
}
}
集合数据源
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object DataSetSource { def main(args: Array[String]): Unit = { //获取批量处理程序入口类ExecutionEnvironment val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //从集合当中创建dataSet val myArray = Array("hello world","spark flink") val collectionSet: DataSet[String] = environment.fromCollection(myArray) val result: AggregateDataSet[(String, Int)] = collectionSet.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
result.setParallelism(1).print() // result.writeAsText("c:\HELLO.TXT") environment.execute()
}
}
2、dataSet的算子介绍
官网算子介绍:
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/dataset_transformations.html
dataSet的transformation算子
Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
FlatMap:输入一个元素,可以返回零个,一个或者多个元素
MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregate:sum、max、min等
Distinct:返回一个数据集中去重之后的元素,data.distinct()
Join:内连接
OuterJoin:外链接
需求一:使用mapPartition将数据保存到数据库
第一步:导入mysql的jar包坐标
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
第二步:创建mysql数据库以及数据库表
/*!40101 SET NAMES utf8 */;
/!40101 SET SQL_MODE=''/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /!32312 IF NOT EXISTS/flink_db
/*!40100 DEFAULT CHARACTER SET utf8 */;
USE flink_db
;
/*Table structure for table user
*/
DROP TABLE IF EXISTS user
;
CREATE TABLE user
(
id
int(10) NOT NULL AUTO_INCREMENT,
name
varchar(32) DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
第三步:代码开发
import java.sql.PreparedStatement import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object MapPartition2MySql { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataset: DataSet[String] = environment.fromElements("1 zhangsan","2 lisi","3 wangwu")
sourceDataset.mapPartition(part => {
Class.forName("com.mysql.jdbc.Driver").newInstance() val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_db", "root", "123456")
part.map(x => { val statement: PreparedStatement = conn.prepareStatement("insert into user (id,name) values(?,?)")
statement.setInt(1, x.split(" ")(0).toInt)
statement.setString(2, x.split(" ")(1))
statement.execute()
})
}).print()
environment.execute()
}
}
需求二:连接操作
左外连接,右外连接,满外连接等算子的操作可以实现对两个dataset进行join操作,按照我们指定的条件进行join
object BatchDemoOuterJoinScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val data1 = ListBufferTuple2[Int,String]
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww")) val data2 = ListBufferTuple2[Int,String]
data2.append((1,"beijing"))
data2.append((2,"shanghai"))
data2.append((4,"guangzhou")) val text1 = env.fromCollection(data1) val text2 = env.fromCollection(data2)
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(first==null){
(second._1,"null",second._2)
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(first==null){
(second._1,"null",second._2)
}else if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
}
}
dataSet的partition算子
Rebalance:对数据集进行再平衡,重分区,消除数据倾斜
Hash-Partition:根据指定key的哈希值对数据集进行分区
partitionByHash()
Range-Partition:根据指定的key对数据集进行范围分区
.partitionByRange()
Custom Partitioning:自定义分区规则,自定义分区需要实现Partitioner接口partitionCustom(partitioner, "someKey")或者partitionCustom(partitioner, 0)
在flink批量处理当中,分区算子主要涉及到rebalance,partitionByHash
,partitionByRange以及partitionCustom来进行分区
object FlinkPartition { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment environment.setParallelism(2) import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hive sqoop") val filterSet: DataSet[String] = sourceDataSet.filter(x => x.contains("hello"))
.rebalance()
filterSet.print()
environment.execute()
}
}
自定义分区来实现数据分区操作
第一步:自定义分区scala的class类
import org.apache.flink.api.common.functions.Partitioner class MyPartitioner2 extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分区个数为****" + num) if(word.contains("hello")){
println("0****号分区****") 0 }else{
println("1****号分区****") 1 }
}
}
第二步:代码实现
import org.apache.flink.api.scala.ExecutionEnvironment object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//设置我们的分区数,如果不设置,默认使用CPU核数作为分区个数environment.setParallelism(2) import org.apache.flink.api.scala._ //获取dataset val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val result: DataSet[String] = sourceDataSet.partitionCustom(new MyPartitioner2,x => x + "") val value: DataSet[String] = result.map(x => {
println("****数据的****key****为****" + x + "****线程为****" + Thread.currentThread().getId)
x
})
value.print()
environment.execute()
}
}
dataSet的sink算子
1、writeAsText() / TextOutputFormat:以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的
2、writeAsFormattedText() / TextOutputFormat:以字符串的形式逐行写入元素。字符串是通过为每个元素调用用户定义的format()方法获得的。
3、writeAsCsv(...) / CsvOutputFormat:将元组写入以逗号分隔的文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
4、print() / printToErr() / print(String msg) / printToErr(String msg) ()(注: 线上应用杜绝使用,采用抽样打印或者日志的方式)
5、write() / FileOutputFormat
6、output()/ OutputFormat:通用的输出方法,用于不基于文件的数据接收器(如将结果存储在数据库中)。
3、dataSet的参数传递
在dataSet代码当中,经常用到一些参数,我们可以通过构造器的方式传递参数,或者使用withParameters方法来进行参数传递,或者使用ExecutionConfig来进行参数传递
1、使用构造器来传递参数
object FlinkParameter { def main(args: Array[String]): Unit = { val env=ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilterFunction("****test****"))
filterSet.print()
env.execute()
}
} class MyFilterFunction (parameter:String) extends FilterFunction[String]{ override def filter(t: String): Boolean = { if(t.contains(parameter)){ true }else{ false }
}
}
2、使用withParameters来传递参数
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object FlinkParameter { def main(args: Array[String]): Unit = { val env=ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val configuration = new Configuration()
configuration.setString("parameterKey","test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilter).withParameters(configuration)
filterSet.print()
env.execute()
}
} class MyFilter extends RichFilterFunction[String]{ var value:String =""; override def open(parameters: Configuration): Unit = { value = parameters.getString("parameterKey","defaultValue")
} override def filter(t: String): Boolean = { if(t.contains(value)){ true }else{ false }
}
}
3、全局参数传递
import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object FlinkParameter { def main(args: Array[String]): Unit = { val configuration = new Configuration()
configuration.setString("parameterKey","test") val env=ExecutionEnvironment.getExecutionEnvironment env.getConfig.setGlobalJobParameters(configuration) import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilter)
filterSet.print()
env.execute()
}
} class MyFilter extends RichFilterFunction[String]{ var value:String =""; override def open(parameters: Configuration): Unit = { val parameters: ExecutionConfig.GlobalJobParameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters val globalConf:Configuration = parameters.asInstanceOf[Configuration] value = globalConf.getString("parameterKey","test")
} override def filter(t: String): Boolean = { if(t.contains(value)){ true }else{ false }
}
}
4、Flink的dataSet connectors
1、文件系统connector
为了从文件系统读取数据,Flink内置了对以下文件系统的支持:
|
文件系统
|
Schema
|
备注
|
|
HDFS
|
hdfs://
|
Hdfs文件系统
|
|
S3
|
s3://
|
通过hadoop文件系统实现支持
|
|
MapR
|
maprfs://
|
需要用户添加jar
|
|
Alluxio
|
alluxio://
|
通过hadoop文件系统实现
|
注意:Flink允许用户使用实现org.apache.hadoop.fs.FileSystem接口的任何文件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种文件系统
Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码:
使用Hadoop Writable data type
使用任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
使用任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
使用Hadoop Mapper作为FlatMapFunction
使用Hadoop Reducer作为GroupReduceFunction
2、Flink集成Hbase之数据读取
Flink也可以直接与hbase进行集成,将hbase作为Flink的source和sink等
第一步:创建hbase表并插入数据
create 'hbasesource','f1'
put 'hbasesource','0001','f1:name','zhangsan'
put 'hbasesource','0002','f1:age','18'
第二步:导入整合jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId>
<version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency>
第三步:开发flink集成hbase读取hbase数据
import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkReadHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val hbaseData: DataSet[tuple.Tuple2[String, String]] = environment.createInput(new TableInputFormat[tuple.Tuple2[String, String]] { override def configure(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") val conn: Connection = ConnectionFactory.createConnection(conf) table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource"))) scan = new Scan() { // setStartRow(Bytes.toBytes("1001"))
// setStopRow(Bytes.toBytes("1004")) addFamily(Bytes.toBytes("f1"))
}
} override def getScanner: Scan = { scan } override def getTableName: String = { "hbasesource" } override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = { val rowkey: String = Bytes.toString(result.getRow) val sb = new StringBuffer() for (cell: Cell <- result.rawCells()) { val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
sb.append(value).append(",")
} val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]
tuple2.setField(rowkey, 0)
tuple2.setField(valueString, 1)
tuple2
}
})
hbaseData.print()
environment.execute()
}
}
3、Flink读取数据,然后写入hbase
Flink也可以集成Hbase实现将数据写入到Hbase里面去
第一种:实现OutputFormat接口
第二种:继承RichSinkFunction重写父类方法
import java.util import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.scala.{ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkWriteHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("01,zhangsan,28","02,lisi,30")
sourceDataSet.output(new HBaseOutputFormat)
environment.execute()
}
} class HBaseOutputFormat extends OutputFormat[String]{ val zkServer = "node01" val port = "2181" var conn: Connection = **null
override def** configure(configuration: Configuration): Unit = {
} override def open(i: Int, i1: Int): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config)
} override def writeRecord(it: String): Unit = { val tableName: TableName = TableName.valueOf("hbasesource") val cf1 = "f1" val array: Array[String] = it.split(",") val put: Put = new Put(Bytes.toBytes(array(0)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) val putList: util.ArrayList[Put] = new util.ArrayList[Put]
putList.add(put) //设置缓存1m,当达到1m时数据会自动刷到hbase val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //设置缓存的大小params.writeBufferSize(1024 * 1024) val mutator: BufferedMutator = conn.getBufferedMutator(params)
mutator.mutate(putList)
mutator.flush()
putList.clear()
} override def close(): Unit = { if(null != conn){ conn.close()
}
}
}