Flink结合布隆过滤器进行全局去重并结合状态管理进行全局标号

/********************************** 数据去重 **************************************/

    //以前没有记录偏移量,就从头读,如果记录过偏移量,就接着读
    properties.setProperty("auto.offset.reset", "earliest");
    //不自动提交偏移量,让flink提交偏移量
    properties.setProperty("enable.auto.commit", "false");

    KeyedStream<KafkaEvent, Tuple> keyed = env.addSource(new FlinkKafkaConsumer011<String>(
            parameterTool.getRequired("input-topic"),
            new SimpleStringSchema(),
            parameterTool.getProperties()
    )).map(new MapFunction<String, KafkaEvent>() {

        @Override
        public KafkaEvent map(String line) throws Exception {

            return KafkaEvent.fromString(line);
        }
    }).filter(new FilterFunction<KafkaEvent>() {
        @Override
        public boolean filter(KafkaEvent value) throws Exception {
            return value.getTpp()>=360;
        }
    }).keyBy("si", "ci");
    SingleOutputStreamOperator<String> mapstatedata = keyed.process(new ProcessFunction<KafkaEvent, String>() {

        private transient ValueState<BloomFilter> bloomState;
        //private transient ValueState<Integer> counterState;


        @Override
        public void open(Configuration parameters) throws Exception {

            //定义一个布隆过滤器的状态描述器
            ValueStateDescriptor<BloomFilter> bloomStateDescriptor = new ValueStateDescriptor<>("bloomState", BloomFilter.class);


            //ListStateDescriptor<BloomFilter> bloomStateDescriptor = new ListStateDescriptor<>("bloomState", BloomFilter.class);
            //ValueStateDescriptor<Integer> counterDescriptor = new ValueStateDescriptor<>("counterState", Integer.class);

            bloomState = getRuntimeContext().getState(bloomStateDescriptor);

            //bloomState = getRuntimeContext().getListState(bloomStateDescriptor);
            //counterState = getRuntimeContext().getState(counterDescriptor);
        }

        @Override
        public void processElement(KafkaEvent value, Context context, Collector<String> out) throws Exception {

            String tno = value.getTno();

            BloomFilter bloomFilter = bloomState.value();

            StringBuilder stringBuilder = new StringBuilder();

            if (bloomState.value() == null) {
                bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 100000);
                //counterState.update(0);
            }

            //返回false代表一定不存在
            if (!bloomFilter.mightContain(tno)) {
                bloomFilter.put(tno);
                bloomState.update(bloomFilter);
                //counterState.update(counterState.value() + 1);

                //System.out.println("countstate  ==> "+counterState.value());
                String result = stringBuilder//.append(counterState.value()).append("^|")
                        .append(value.getDid()).append("^|")
                        .append(value.getUvs()).toString();
                out.collect(result);
            }
        }
    })/*.uid("mapStateUid")*/.setParallelism(1);

/**
* 全局row_num,operator 状态管理
*/
public static class OperatorStateMap extends RichFlatMapFunction<String, Tuple2<Long, String>> implements CheckpointedFunction {

    //托管状态
    private transient ListState<Long> listState;
    //原始状态
    //private List<Long> listElements = new CopyOnWriteArrayList<>();
    Long listElement = 0L;

    Long i = 0L;

    @Override
    public void flatMap(String value, Collector collector) throws Exception {
        i = listElement;
        if(i == 0) {
            i = i + 1;
            collector.collect(new Tuple2<Long, String>(i, i + "|^|" + value));
            listElement = i;
        }else{
            listElement = listElement + 1;
            i++;
            collector.collect(new Tuple2<Long, String>(listElement, listElement + "|^|" + value));

        }
        //listElements.clear();

        //listElements.add(i);

    }

    /**
     * 进行checkpoint进行快照
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        listState.clear();
        /*for (Long ele : listElements) {
            System.out.println("===============>snapshot存储原始状态"+ele);
            listState.add(ele);
        }*/
        listState.add(listElement);

    }

    /**
     * state的初始状态,包括从故障恢复过来
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("checkPointedList",
                TypeInformation.of(new TypeHint<Long>() {
                }));
        listState = context.getOperatorStateStore().getListState(listStateDescriptor);
        //如果是故障恢复
        /*if (context.isRestored()) {
            //从托管状态将数据到移动到原始状态
            for (Long ele : listState.get()) {
                listElements.add(ele);
            }
            //listState.clear();
        }*/

        if (context.isRestored()) {
            Iterator<Long> iterator = listState.get().iterator();
            while (iterator.hasNext()){
                Long ele = iterator.next();
                System.out.println("===============>"+ele);
                //listElements.add(ele);
                listElement = ele;
            }
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //listElements = new CopyOnWriteArrayList<>();

    }
}

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容