kafka生产者生产消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KfkProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.162:9092");
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0; i < 1; i++) {
producer.send(new ProducerRecord<>("test", null, "hello:" + System.currentTimeMillis()));
}
producer.flush();
}
}
编写Flink消费Kafka的消息类
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
/**
* @author Jum
* kafkaSource
* 实现消费kafka的Exactly_once语义
* 当此程序退出后,重新启动,只会把未消费的数据进行消费
*/
public class StreamingKafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
String topic = "test";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "192.168.0.162:9092");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();//默认消费策略
DataStreamSource<String> text = env.addSource(myConsumer);
text.print();
env.execute(StreamingKafkaSource.class.getSimpleName());
}
}