RxJava用法

核心思想:

RxJava学习和使用最重要的是掌握它的核心思想,它在 github 主页的介绍是:

"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"

我理解是:一个运行在 Java 虚拟机上的可序列观察组合、异步和基于事件的库。所以能推导 Rx 定义为:序列观察组合异步事件。
我们能得到几个关键词:

  • 序列:对顺序敏感(事件发送)
  • 观察:响应式编程
  • 组合:可处理多个事件
  • 异步:可调度线程
  • 事件:可以理解为起点发送的事件,也可以理解为每个中间环节的操作

根据使用经验来总结其核心思想:在起点注册观察后分发一个事件,流式操作中间步骤,终点消费事件解除观察。

因为它的操作符很多,遍历学习操作符性价比很小,我们根据其核心思想,以输出的方式来学习 RxJava。

普通应用:

业务要求1:有一个图片url地址,下载图片并且将图片显示在 ImageView。

思路:事件为下载并且展示图片到 ImageView,事件起点为分发(放置)一个 url 给下游中间步骤1(传送带),下游中间步骤1拿到的为String而步骤二设置图片显示需要的数据为 Bitmap,所以此步骤需要 Map 操作符来将url转换为 Bitmap并且应该 为异步操作,然后将 Bitmap分发给下游中间步骤 2,步骤 2 将 Bitmap更新到 ImageView,消费掉事件,事件结束。

流程图为:


image-20200527164516864.png

上代码:

    // 打印logcat日志的标签
    String TAG = DownloadActivity.class.getSimpleName();

  // 网络图片的链接地址
   String PATH = "http://pic1.win4000.com/wallpaper/c/53cdd1f7c1f21.jpg";

  // 弹出加载框
  ProgressDialog progressDialog;

Observable.just(PATH)  // 内部会分发  PATH Stirng // TODO 第二步 
  // TODO 第二步      
                .map(new Function<String, Bitmap>() {
            @Override
            public Bitmap apply(String s) throws Exception {
                URL url = new URL(PATH);
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                httpURLConnection.setConnectTimeout(5000);
                int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    InputStream inputStream = httpURLConnection.getInputStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    return bitmap;
                }
                return null;
            }
        })
        // 日志记录
        .map(new Function<Bitmap, Bitmap>() {
            @Override
            public Bitmap apply(Bitmap bitmap) throws Exception {
                Log.d(TAG, "apply: 是这个时候下载了图片啊:" + System.currentTimeMillis());
                return bitmap;
            }
        })
        .subscribeOn(Schedulers.io())     // 给上面代码分配异步线程
        .observeOn(AndroidSchedulers.mainThread()) // 切换主线程
        // 订阅 起点 和 终点 订阅起来
        .subscribe(
                // 终点
                new Observer<Bitmap>() {
                    // 订阅开始
                    @Override
                    public void onSubscribe(Disposable d) {
                        // 预备 开始 要分发 创建并展示 loading
                        // TODO 第一步
                        progressDialog = new ProgressDialog(DownloadActivity.this);
                        progressDialog.setTitle("download run");
                        progressDialog.show();
                    }

                    // TODO 第四步
                    // 消费事件
                    @Override
                    public void onNext(Bitmap bitmap) {
                        image.setImageBitmap(bitmap);
                    }

                    // 错误事件
                    @Override
                    public void onError(Throwable e) {
                    }
                    // TODO 第五步
                    // 完成事件
                    @Override
                    public void onComplete() {
                        if (progressDialog != null)
                            progressDialog.dismiss();
                    }
        });

实现完上边需求,产品小姐姐说需要一个加水印的功能,没问题,我们就在上边代码基础上来做改造。

思路:在上述步骤1,拿到 Bitmap 后然后我们将步骤2 改为为 Bitmap 添加水印,然后步骤3 来完成图片的展示和事件的消费。

修改后的流程图为:


image-20200527172449607.png

上代码:

// 打印logcat日志的标签
    String TAG = DownloadActivity.class.getSimpleName();

  // 网络图片的链接地址
   String PATH = "http://pic1.win4000.com/wallpaper/c/53cdd1f7c1f21.jpg";

  // 弹出加载框
  ProgressDialog progressDialog;

Observable.just(PATH)  // 内部会分发  PATH Stirng // TODO 第二步 
  // TODO 第二步      
                .map(new Function<String, Bitmap>() {
            @Override
            public Bitmap apply(String s) throws Exception {
                URL url = new URL(PATH);
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                httpURLConnection.setConnectTimeout(5000);
                int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    InputStream inputStream = httpURLConnection.getInputStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    return bitmap;
                }
                return null;
            }
        })
            // 此处为新增水印操作代码
        .map(new Function<Bitmap, Bitmap>() {
                  @Override
                  public Bitmap apply(Bitmap bitmap) throws Exception {
                      Paint paint = new Paint();
                      paint.setTextSize(88);
                      paint.setColor(Color.RED);
                      return drawTextToBitmap(bitmap, "产品要得水印",paint, 88 , 88);
                  }
              })
  
        // 日志记录
        .map(new Function<Bitmap, Bitmap>() {
            @Override
            public Bitmap apply(Bitmap bitmap) throws Exception {
                Log.d(TAG, "apply: 是这个时候下载了图片啊:" + System.currentTimeMillis());
                return bitmap;
            }
        })
        .subscribeOn(Schedulers.io())     // 给上面代码分配异步线程
        .observeOn(AndroidSchedulers.mainThread()) // 切换主线程
        // 订阅 起点 和 终点 订阅起来
        .subscribe(
                // 终点
                new Observer<Bitmap>() {
                    // 订阅开始
                    @Override
                    public void onSubscribe(Disposable d) {
                        // 预备 开始 要分发 创建并展示 loading
                        // TODO 第一步
                        progressDialog = new ProgressDialog(DownloadActivity.this);
                        progressDialog.setTitle("download run");
                        progressDialog.show();
                    }

                    // TODO 第四步
                    // 消费事件
                    @Override
                    public void onNext(Bitmap bitmap) {
                        image.setImageBitmap(bitmap);
                    }

                    // 错误事件
                    @Override
                    public void onError(Throwable e) {
                    }
                    // TODO 第五步
                    // 完成事件
                    @Override
                    public void onComplete() {
                        if (progressDialog != null)
                            progressDialog.dismiss();
                    }
        });


// 图片上绘制文字 加水印
    private final Bitmap drawTextToBitmap(Bitmap bitmap, String text, Paint paint, int paddingLeft, int paddingTop) {
        Bitmap.Config bitmapConfig = bitmap.getConfig();

        paint.setDither(true); // 获取跟清晰的图像采样
        paint.setFilterBitmap(true);// 过滤一些
        if (bitmapConfig == null) {
            bitmapConfig = Bitmap.Config.ARGB_8888;
        }
        bitmap = bitmap.copy(bitmapConfig, true);
        Canvas canvas = new Canvas(bitmap);

        canvas.drawText(text, paddingLeft, paddingTop, paint);
        return bitmap;
    }

防抖:

防抖的意思就是说规定时间内接收到多次事件只触发一次。放置恶意轰炸,或者重复触发问题。

业务需求:如下图,两秒内项目按钮点击只完成一次获取所有项目分类,并且根据项目 id 获取所有项目下的所有列表数据。

image-20200527174315867.png

一般思路:利用 RxBinding 处理防抖动,然后步骤1,网络获取所有项目数据,数据类型为[ProjectBean,ProjectBean,ProjectBean,ProjectBean,ProjectBean...],步骤 2,根据步骤 1 获取的ProjectBeanList 遍历,分别根据每个 ProjectBean id 去获取项目列表数据。步骤 3,分别获取每个项目的列表数据 [ProjectItem,ProjectItem,ProjectItem,ProjectItem,ProjectItem..]去更新 UI消费掉该事件。

流程图如下:

image-20200527180256022.png

上代码:(此处为伪代码实现)

// 注意:(项目分类)查询的id,通过此id再去查询(项目列表数据)
        // 对那个控件防抖动?
        Button bt_anti_shake = findViewById(R.id.bt_anti_shake);
        RxView.clicks(bt_anti_shake)
                .throttleFirst(2000, TimeUnit.MILLISECONDS) // 2秒钟之内 响应你一次
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        api.getProject() // 查询主数据
                        .compose(RxUtils.rxud()) //线程切换封装
                        .subscribe(new Consumer<ProjectBean>() {
                            @Override
                            public void accept(ProjectBean projectBean) throws Exception {
                                for (ProjectBean.DataBean dataBean : projectBean.getData()) { // 10
                                    // 查询item数据
                                    api.getProjectItem(1, dataBean.getId())
                                    .compose(RxUtils.rxud())
                                    .subscribe(new Consumer<ProjectItem>() {
                                        @Override
                                        public void accept(ProjectItem projectItem) throws Exception {
                                            Log.d(TAG, "accept: " + projectItem); // 可以UI操作
                                        }
                                    });
                                }
                            }
                        });
                    }
                });

网络嵌套:

上边防抖代码可读性很差,why?因为上边代码有两次嵌套缩进,第一次为实现防抖动,回调缩进了一次,项目列表数据获取也在主数据获取的回调嵌套中。

解决迷之嵌套问题,我们需要了解 RxJava 的另一个操作符:

flatmap:可以将一个观察集合数据的观察者展开为多个观察一条数据的观察者。这也是他和 map 的最主要区别,当集合中只有一条数据时,它的作用将和 map 一样。回顾上边普通应用的需求实现都是从头到尾一个观察者,观察着一条数据向下游流动。

改造版代码:(同样为伪代码)

// 对那个控件防抖动?
        Button bt_anti_shake = findViewById(R.id.bt_anti_shake);

        RxView.clicks(bt_anti_shake)
                .throttleFirst(2000, TimeUnit.MILLISECONDS) // 2秒钟之内 响应你一次
                // 我只给下面 切换 异步
                .observeOn(Schedulers.io())
                .flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
                    @Override
                    public ObservableSource<ProjectBean> apply(Object o) throws Exception {
                        return api.getProject(); // 主数据
                    }
                })
                .flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
                    @Override
                    public ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception {
                        return Observable.fromIterable(projectBean.getData()); // 注册多个观察者
                    }
                })
                .flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
                    @Override
                    public ObservableSource<ProjectItem> apply(ProjectBean.DataBean dataBean) throws Exception {
                        return api.getProjectItem(1, dataBean.getId());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) // 给下面切换 主线程
                .subscribe(new Consumer<ProjectItem>() {
                    @Override
                    public void accept(ProjectItem projectItem) throws Exception {
                        // 如果我要更新UI  会报错2  不会报错1
                        Log.d(TAG, "accept: " + projectItem);
                    }
                });

连续操作:

上边需求发现没?有一个共同规律,就是都是一个事件开始,然后事件结束。如果实现一个时间开始,一个时间结束,然后再次开启一个事件,然后再次结束第二个事件怎么办?是和上述代码并列,再次创建一个观察者?NO!看下边操作符。

doNoNext:可以用于,多个连续事件,执行过程中,事件都需要按顺序消费掉前边事件然后继续执行后边事件。

业务需求:用户注册完成后更新UI 跳转登录页面并且自动登录后跳转首页。

实现思路:用户点击 reigstBtn获取注册结果,拿到注册结果 UI 线程去跳转登录页面,然后工作线程去根据 reigstResponse 去请求登录数据,拿到登录信息去跳转首页,消费掉这个事件。注意:此时消费掉的事件并非最初事件。

业务流程比较简单,直接代码实现:(伪代码)

/**
         * 一行代码 实现需求
         * 需求:
         *   还有弹出加载
         *  * 1.请求服务器注册操作
         *  * 2.注册完成之后,更新注册UI
         *  * 3.马上去登录服务器操作
         *  * 4.登录完成之后,更新登录的UI
         */
        MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
                .registerAction(new RegisterRequest()) // todo 1.请求服务器注册操作   // todo 2
                .subscribeOn(Schedulers.io()) // 给上面 异步
                .observeOn(AndroidSchedulers.mainThread()) // 给下面分配主线程
                .doOnNext(new Consumer<RegisterResponse>() { // todo 3
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        // todo 2.注册完成之后,更新注册UI
                    }
                })
                // todo 3.马上去登录服务器操作
                .observeOn(Schedulers.io()) // 给下面分配了异步线程
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
                                .loginAction(new LoginReqeust());
                        return loginResponseObservable;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) // 给下面 执行主线程
                .subscribe(new Observer<LoginResponse>() {
                    // 一定是主线程,为什么,因为 subscribe 马上调用onSubscribe
                    @Override
                    public void onSubscribe(Disposable d) {
                        // TODO 1
                        progressDialog = new ProgressDialog(RequestActivity.this);
                        progressDialog.show();
                        // UI 操作
                        disposable = d;
                    }

                    @Override
                    public void onNext(LoginResponse loginResponse) { // todo 5
                        // TODO 4.登录完成之后,更新登录的UI
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    // todo 6
                    @Override
                    public void onComplete() {
                        // over
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                    }
                });

􏱥􏰎

并行操作:

􏲩􏲀􏰟􏰠􏲪􏲫􏰖􏲀􏲊􏲬􏲝􏲅􏰶再有一个例子,右下边需求:

image-20200605104151377.png
Map<String, String> map = CZController.paramHelp.getBasicParam();
        map.put("catalogId", catalogId);
        map.put("courseId", courseId);


        Observable<CZResult<String>> obLiveData =
                ApiWrapper.getService().getLiveRoomV2(map)
                        .subscribeOn(Schedulers.io());
        Observable<CZResult<String>> obUserInfo =
                ApiWrapper.getService().getLiveEmployeeInfo(map)
                        .subscribeOn(Schedulers.io());

        Observable.merge(obLiveData, obUserInfo)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new CZSubscriberD<CZResult<String>>() {
                    @Override
                    public void onCZNext(String json) {
                      
                    }
                });
}

递归:

还是一个需求来看,扫描某个路径下的所有文件,找出格式为 .doc 的文件并输出路径。

直接看代码:

/**
* 获取某文件夹下所有 doc 格式的文件路径
*/
private void getFiles(File file) {
    if(!file.isDirectory()) return;

        Observable.just(file)
                .observeOn(Schedulers.io())
                .map(new Function<File, List<File>>() {
                    @Override
                    public List<File> apply(File file) throws Exception {
                        return Arrays.asList(file.listFiles());
                    }
                })
                .flatMap(new Function<List<File>, ObservableSource<File>>() {
                    @Override
                    public ObservableSource<File> apply(List<File> files) throws Exception {
                        return Observable.fromIterable(files);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<File>() {
                    @Override
                    public void accept(File file) throws Exception {
                        //TODO 判断 file 类型,如果是文件夹则递归,如果是需要的格式则继续,否则结束
                        if (file.isDirectory()) {
                            getFiles(file);
                        } else if (file.getName().endsWith(".doc")) {
                            Log.d("Tag1", "path:" + file.getPath());
                        } else {
                            Log.d("Tag1", "其他类型文件");
                        }
                    }
                });
    }

哈哈哈,咋样,如果你真拿上边代码去应用,就会发现一个问题,卡的一笔。

为什么呢,flatmap 就会创建 n 个 Observable,然后外层递归更可怕 n 的 n 次方个Observable,所以递归中最好都不要有对象的创建,哈哈哈。

看一下下边代码:

/**
 * Created by droidwolf on 2017/11/14.
 * https://my.oschina.net/droidwolf
 * 转载请注明
 */
public class FileTreeWalker implements Iterable<File> {
    private ArrayDeque<File> mDirectories =null;
    private ArrayDeque<File> mFiles = null;
 
    public FileTreeWalker walk(File path) {
        if (mDirectories != null && !mDirectories.isEmpty()) {
            mDirectories.clear();
        }
        if (mFiles != null && !mFiles.isEmpty()) {
            mFiles.clear();
        }
        walkDir(path);
        return this;
    }
 
    private void walkDir(File path) {
        if (path == null || !path.exists() || !path.isDirectory()) {
            return;
        }
        final File[] files = path.listFiles();
        if (files == null || files.length == 0) {
            return;
        }
        if(mDirectories==null) mDirectories = new ArrayDeque<File>(256);
        if(mFiles==null) mFiles = new ArrayDeque<File>(512);
 
        for (File f : files) {
            if (f.isDirectory()) {
                mDirectories.push(f);
            } else {
                mFiles.addLast(f);
            }
        }
    }
 
    @Override
    public Iterator<File> iterator() {
        return mIterator;
    }
    private final Iterator<File> mIterator=new Iterator<File>() {
        @Override
        public boolean hasNext() {
            return (mFiles!=null &&!mFiles.isEmpty()) || (mDirectories!=null&& !mDirectories.isEmpty());
        }
        @Override
        public File next() {
            if (mFiles != null && !mFiles.isEmpty()) {
                final File f = mFiles.pollFirst();
 
                return f;
            } else if (mDirectories != null && !mDirectories.isEmpty()) {
                final File dir = mDirectories.pop();
 
                walkDir(dir);
                return dir;
            }
            return null;
        }
    };
}



// rxjava调用FileTreeWalker 
     Observable.fromIterable(new FileTreeWalker().walk(path))
                .subscribeOn(Schedulers.io())
                .filter(file -> {
                    //。。。
                    return false;
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(fileNext -> {
                    //。。。
                });
//当然也可以在java8中使用FileTreeWalker

StreamSupport.stream(Spliterators.spliteratorUnknownSize(new FileTreeWalker().walk(new File("C:\\")).iterator(),Spliterator.NONNULL),true )
.forEach(file->{
    if(file.isDirectory()) {
        System.out.println(file.getName());
    }else {
        System.out.println("\t"+file.getName());
    }
});

小结

带着本文第一节对“核心思想”的理解,来思考一下这几个业务实现。发现每个业务都重点关注事件的开始和结束也就是事件的传入数据类型和最终消费事件需要的数据类型,中间步骤的所有转换都有操作符去支持我们。

上边图片下载问题可以举个相似栗子:小明肚子饿了,想吃东西利用核心思想来实现一下:

image-20200527190542686.png

此事件完整实现需要先确定事件开始(大脑饿了信号类型),和事件结束(食物类型)中间具体需要步骤都可以有操作符来实现。

网络嵌套需求举个相似栗子:小明饿了,想吃 10 种东西:

image-20200527192110882.png

连续操作需求举一个相似栗子:小明饿了,吃完饭然后去上学。

image-20200527193408847.png

关注开始与结束

操作符汇总:

image-20200605185317819.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342