从Java 8 stream 到 rxjava, 记录一次数据流的处理任务

在java8之前, 一直都是用guava中的Iterables和FluentIterables来处理数据流。 java8 的 lambda 和 方法引用 极大的简化了内部类的处理。
不过stream还是比较初级的,实际使用过程中只能处理简单的数据流任务。主要遇到的问题是,在调用消费方法(例如forEach, collect)之后。该流即为终结状态,无法再复用。 无法优雅地需要处理大量中间结果的复杂计算。

于是试用了一下rxjava, 底层是订阅发布模型,上层可以用来处理数据流。
实现引入框架,我们用到了extension中的数学方法

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.github.akarnokd</groupId>
            <artifactId>rxjava2-extensions</artifactId>
            <version>0.18.8</version>
        </dependency>

下面这段代码对集合的某个Decimal字段求和

        BigDecimal sigma = flowables
            .map(CalculateContext::getTempValue1)
            .reduce(BigDecimal.ZERO, (sum, each) -> sum.add(each))
            .blockingGet();

筛除记录、排序、幷缓存结果(下次使用不会再运行之前的回调)

flowable = flowable.filter(each -> each.someCondition  == true)
            .sorted(Comparator.comparing(EachType::getTempValue).reversed())
            .cache();

合并两个流, 映射到某个字段,然后取最大值。这里用到了扩展包

Flowable<Type> flowable = Flowable
            .merge(flowable1, flowable2)
            .map(Type::getValue)
            .to(MathFlowable::max)
            .blockingSingle();

项目中没有用到flatMap。 这个也是极常用的操作。还有一些高级特性包括背压、线程控制 暂时没有涉及,以后有这样的场景机会的时候再尝试下 :)

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Java 8 数据流教程 原文:Java 8 Stream Tutorial 译者:飞龙 协议:CC BY-NC-...
    布客飞龙阅读 998评论 1 46
  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,353评论 1 2
  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,728评论 13 85
  • 晨起 黎明穿上了白衣 雾气未散 夜寒未消 没有鸡鸣的城市 人们还在沉睡 留恋被窝里的暖 以及 温暖里的梦 那是行走...
    一记耳光阅读 306评论 0 0
  • 妈妈是一个很勤俭节约的人,今天来看妈妈,在晚上我回去 老妈送我去坐车的时候,突然从口袋掏出200块钱非要给我,说是...
    庸人请自扰阅读 207评论 1 1

友情链接更多精彩内容