1.pom.xml 文件引用jar包,jar包冲突可能会启动不了。flink-client 必须
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>flinktest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.15.0</flink.version>
<scala.version>2.12.7</scala.version>
<redis.version>3.2.0</redis.version>
<hbase.version>1.3.3</hbase.version>
<mysql.version>5.1.44</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven 打jar包需要插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
读取文本的wordcount
object FlinkDay1 {
def main(args: Array[String]): Unit = {
// val env = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(2)
val initStream = env.readTextFile("C:\\Users\\shenyingkui\\Desktop\\datatest.txt");
val wordStream = initStream.flatMap(_.split(" "))
wordStream.print()
val pairStream = wordStream.map((_,1))
val keyByStream = pairStream.groupBy(0)
val restStream = keyByStream.sum(1)
restStream.print()
}
}
stream 流,通过socket 的wordcount
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object StreamDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val datastream = environment.socketTextStream("192.168.71.106",9998)
datastream.print()
datastream.map((_,1)).keyBy(0).sum(1).print()
environment.execute("first stream")
}
}
datastream.map((_,1)).keyBy(_._1).countWindow(10,2).reduce(new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t1._1,t1._2+t._2)
}
}) .print()//滚动窗口
datastream.flatMap((_.split(" "))).map((_,1)).keyBy((_,1)).reduce((v:(String,Int) ,v1:(String,Int) )=>{
(v._1,v._2+v1._2)
}).print()