kafka stream 内容过滤 demo

写个demo 练练手 , 结论: 思维必须成流式 ,不要以数据库的方式去看待流式聚合 , 流式的聚合,在time window 中 也会产生很多事件 . 最后一点 ,kafka 提供着数据库存储能力的ktable。
也就是说 ,你可以 发请求给instance ,获取ktable 的聚合数据 。
而不是 自己写个服务作为消费者然后去实现ktable的聚合 ,这点有点别扭
建议大家看看 https://github.com/confluentinc/kafka-streams-examples.git

网状网络拓扑结构 (2).png

10s 超过5次 评论 代码

        SpecificAvroSerde<Content> contentSpecificAvroSerde = new SpecificAvroSerde<>();
        SpecificAvroSerde<ContentbyUserId> userContentSpecificAvroSerde = new SpecificAvroSerde<>();
        contentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        userContentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        Properties properties = config("contentId1", "localhost:9092", "/tmp/filter1");
        // key 为 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KGroupedStream<String, Content> stringContentKGroupedStream = kStream.groupByKey();
        KStream<Windowed<String>, ContentbyUserId> k1ResultStrem = k1Result
                .toStream()
                .filter((k, v) -> {
            return null != v && v.getCount() > 5;
        });
        k1ResultStrem.print(Printed.toSysOut());

10s 内 输入事件间隔小于1s的事件数 >5

  // key 为 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KStream<Windowed<String>,Long> kStreamResult2 = stringContentKGroupedStream.windowedBy(SessionWindows.with(1 * 1000))
                .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("result2")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .filter((e, v) -> {
                            // session 窗口时间 大于 10 s 且 数量大于5
                            if (v != null && e.window().end() - e.window().start() > 10 * 1000 && v.longValue() > 5) {
                                return true;
                            }
                            return false;

                        }
                );

        kStreamResult2.print(Printed.toSysOut());

基础配置

    public static Properties config(String appliactionId, String bootstrapServers, String stateDir) {
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appliactionId);
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        streamsConfiguration.put("schema.registry.url", "http://localhost:8081");


        // Provide the details of our embedded http service that we'll use to connect to this streams
        // instance and discover locations of stores.
//        streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + applicationServerPort);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
        // Set to earliest so we don't miss any data that arrived in the topics before the process
        // started
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Set the commit interval to 500ms so that any changes are flushed frequently and the top five
        // charts are updated with low latency.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
        // Allow the user to fine-tune the `metadata.max.age.ms` via Java system properties from the CLI.
        // Lowering this parameter from its default of 5 minutes to a few seconds is helpful in
        // situations where the input topic was not pre-created before running the application because
        // the application will discover a newly created topic faster.  In production, you would
        // typically not change this parameter from its default.
        String metadataMaxAgeMs = System.getProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG);
        if (metadataMaxAgeMs != null) {
            try {
                int value = Integer.parseInt(metadataMaxAgeMs);
                streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, value);
                System.out.println("Set consumer configuration " + ConsumerConfig.METADATA_MAX_AGE_CONFIG +
                        " to " + value);
            } catch (NumberFormatException ignored) {
            }
        }
        return streamsConfiguration;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,657评论 18 139
  • Kafka设计解析(七)- Kafka Stream 原创文章,转载请务必将下面这段话置于文章开头处。本文转发自技...
    小小少年Boy阅读 5,250评论 0 32
  • Kafka官网:http://kafka.apache.org/入门1.1 介绍Kafka™ 是一个分布式流处理系...
    it_zzy阅读 3,894评论 3 53
  • 前言:前段时间接触过一个流式计算的任务,使用了阿里巴巴集团的JStorm,发现这个领域值得探索,就发现了这篇文章—...
    程序熊大阅读 6,292评论 5 31
  • 大学四年,如果没有定好目标前行,很容易随大流,然后慢慢退步,该吃吃,该喝喝,说玩耍就玩耍,你所以为的自由不...
    Control_280f阅读 165评论 0 0