Flink的DataStream集成kafka

对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,所以我们一起来看看如何通过Flink来集成kafka

flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据

flink整合kafka官网介绍

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html

第一步:导入jar包

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>

第二步:将kafka作为flink的source来使用

实际工作当中一般都是将kafka作为flink的source来使用

创建kafka的topic

安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test

cd /kkb/install/kafka_2.11-1.1.0

bin/kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181

代码实现:

import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object FlinkKafkaSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隐式转换import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(100);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","con1")
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); var kafkaSoruce: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)

kafkaSoruce.setCommitOffsetsOnCheckpoints(true) //设置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka/checkpoints",true)); val result: DataStream[String] = env.addSource(kafkaSoruce)
result.print()
env.execute()
}
}

kafka生产数据

node01执行以下命令,通过shell命令行来生产数据到kafka当中去

cd /kkb/install/kafka_2.11-1.1.0

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

第三步:将kafka作为flink的sink来使用

我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去

代码实现

import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper object FlinkKafkaSink {
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隐式转换import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(5000);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));

val text = env.socketTextStream("node01",9000) val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","kafka_group1") //第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间 //设置事务超时时间prop.setProperty("transaction.timeout.ms",6000015+""); //第二种解决方案,设置kafka的最大事务超时时间* //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//
使用支持仅一次语义的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
text.addSink(myProducer)
env.execute("StreamingFromCollectionScala")
}
}

启动socket服务发送数据

node01执行以下命令,发送数据到socket服务里面去

nc -lk 9000

启动kafka消费者

node01执行以下命令启动kafka消费者,消费数据

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092 --topic test

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容