用flink算UV(UserVisit)数~即独立访客数

思路:读入数据时,用flatMap算子过滤出PV(即PageVisit)的一条条的数据,在process算子中,用set对用户的id作去重,从而set的size即UV(UserVisit)数。

原始数据:


图片.png

实体类:


图片.png
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;

public class UV {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(3);

        DataStreamSource<String> source = env.readTextFile("input/UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> stream = source.flatMap(new FlatMapFunction<String, UserBehavior>() {
            @Override
            public void flatMap(String line, Collector<UserBehavior> out) throws Exception {
                String[] arr = line.split(",");
                UserBehavior userBehavior = new UserBehavior(
                        Long.valueOf(arr[0]),
                        Long.valueOf(arr[1]),
                        Integer.valueOf(arr[2]),
                        arr[3],
                        Long.valueOf(arr[4]));
                if ("pv".equals(userBehavior.getBehavior()))
                    out.collect(userBehavior);
            }
        });

        stream.keyBy(UserBehavior::getBehavior)
                .process(new KeyedProcessFunction<String, UserBehavior, Long>() {
                    HashSet<Long> set = new HashSet<>();

                    @Override
                    public void processElement(UserBehavior value, Context ctx, Collector<Long> out) throws Exception {
                        set.add(value.getUserId());
                        out.collect((long) set.size());
                    }
                }).print();

        System.out.println("~~~~");
        env.execute();
    }
}
图片.png

增加去重逻辑:

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容