import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, RichCoFlatMapFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.util.Collector
object flink_connect {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers" ,"xiao100:9092")
properties.setProperty("group.id","flink_consumer")
val consumer = new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema, properties)
val kafka: DataStream[String] = env.addSource(consumer)
val socket: DataStream[String] = env.socketTextStream("192.168.252.100",9999)
val k_wo: DataStream[(String, Int)] = kafka.flatMap(_.split(",")).map((_,1))
val s_wo: DataStream[(String, String)] = socket.map(x => (x.split(",")(0),x.split(",")(1)))
val all: ConnectedStreams[(String, Int), (String, String)] = k_wo.connect(s_wo)
val key: ConnectedStreams[(String, Int), (String, String)] = all.keyBy(x=> x._1,y=> y._1)
val result: DataStream[String] = key.flatMap(new MyCoFlatmapFunction())
result.print()
env.execute()
}
}
class MyCoFlatmapFunction extends RichCoFlatMapFunction[(String,Int),(String,String),String]() {
var value: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
value = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isout",classOf[Boolean]))
}
override def flatMap1(in1: (String, Int), collector: Collector[String]): Unit = {
var bool: Boolean = value.value()
if(null == bool) bool = false
if(bool){
collector.collect(in1._1+"============>"+in1._2)
}
}
override def flatMap2(in2: (String, String), collector: Collector[String]): Unit = {
println(in2._1+"--->"+in2._2)
value.update(in2._2.toBoolean)
}
}
感觉这种场景还是挺有用的,经过在本地和yarn上测,没问题