RxJava 探索(二)— 使用指南

RxJava GitHub 主页

RxJava Wiki 主页

RxJava 是 ReactiveX 在 JVM 上的一个实现,ReactiveX 使用 Observable 序列组合异步和基于事件的程序。

更多关于 ReactiveX 的资料,可以看这里:

ReactiveX 探索(一)— Rx 介绍

ReactiveX 探索(二) — Rx 五大部件

三、使用指南

下面的示例从一个字符串列表创建一个 Observable,然后使用一个方法订阅这个 Observable

Java
public static void hello(String... names) {
    Observable.from(names).subscribe(new Action1<String>() {

        @Override
        public void call(String s) {
            System.out.println("Hello " + s + "!");
        }

    });
}
hello("Ben", "George");
Hello Ben!
Hello George!

3.1 如何使用 RxJava

要使用 RxJava,首先你需要创建 Observable(它们发射数据序列),使用 Observable 操作符变换那些 Observables,获取严格符合你要求的数据,然后观察并处理这些数据序列(通过实现观察者或订阅者,然后订阅变换后的 Observables)。

3.1.1 创建 Observables

要创建 Observable,你可以传递一个函数给 create() 来手动实现 Observable 的行为,还可以使用这些创建操作符将一个已有的数据结构转换为 Observable

3.1.2 使用已有的数据结构创建 Observable

你可以使用 just()from() 方法将对象,列表,对象数组转换为发射那些对象的 Observable

Observable<String> o = Observable.from("a", "b", "c");

def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);

Observable<String> o = Observable.just("one object");

转换后的 Observable 每发射一项数据,会同步地调用任何订阅者的 onNext() 方法,最后会调用订阅者的 onCompleted() 方法。

3.1.3 使用 create() 创建一个 Observable

使用 create() 方法,你可以创建你自己的 Observable,可以实现异步 I/O,计算操作,甚至是无限的数据流。

同步的 Observable 示例
/**
 * This example shows a custom Observable that blocks 
 * when subscribed to (does not spawn an extra thread).
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i ->
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

// To see output:
customObservableBlocking().subscribe { println(it) }
异步的 Observable 示例

下面的例子使用 Groovy 创建了一个发射 75 个字符串的 Observable

为了让它更清楚,例子使用了静态类型和匿名内部类 Func1:

/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..<75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

// To see output:
customObservableNonBlocking().subscribe { println(it) }

这是一个用 Clojure 写的例子,使用 Future(而不是直接用线程),实现很简洁:

(defn customObservableNonBlocking []
  "This example shows a custom Observable that does not block 
   when subscribed to as it spawns a separate thread.
   
  returns Observable<String>"
  (Observable/create 
    (fn [subscriber]
      (let [f (future 
                (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                ; after sending all values we complete the sequence
                (-> subscriber .onCompleted))
        ))
      ))

// To see output
(.subscribe (customObservableNonBlocking) #(println %))

这个例子从维基百科网站抓取文章,每抓取一篇会调用一次 onNext()

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  "Fetch a list of Wikipedia articles asynchronously.
  
   return Observable<String> of HTML"
  (Observable/create 
    (fn [subscriber]
      (let [f (future
                (doseq [articleName wikipediaArticleNames]
                  (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                ; after sending response to onnext we complete the sequence
                (-> subscriber .onCompleted))
        ))))
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
  (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))

回到 Groovy,同样是从维基百科抓取文章,这儿使用闭包代替匿名内部类:

/*
 * Fetch a list of Wikipedia articles asynchronously.
 */
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
    return Observable.create { subscriber ->
        Thread.start {
            for (articleName in wikipediaArticleNames) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
            }
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
        return subscriber
    }
}

fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
    .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }

结果:

--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...

3.2 使用变换操作

对 Groovy 和 Clojure 语言不了解的大致看看就好。

RxJava 让你可以链式使用操作符用来转换和组合多个 Observables

下面是一个 Groovy 的例子,使用之前的定义,它会异步发射 75 个字符串,跳过最开始的 10 个 (skip(10)),然后获取接下来的 5 个 (take(5)),在订阅之前使用 map() 转换它们,然后打印结果字符串。

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
    customObservableNonBlocking().skip(10).take(5)
        .map({ stringValue -> return stringValue + "_xform"})
        .subscribe({ println "onNext => " + it})
}

输出结果:

onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform

这里有一个图例解释了转换过程:

下一个例子使用 Clojure,它使用了三个异步的 Observable,其中一个依赖另一个,使用 zip 组合这三个发射的数据项为一个单个数据项,最后使用 map() 转换这个结果:

(defn getVideoForUser [userId videoId]
  "Get video metadata for a given userId
   - video metadata
   - video bookmark position
   - user data
  return Observable<Map>"
    (let [user-observable (-> (getUser userId)
              (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
          bookmark-observable (-> (getVideoBookmark userId videoId)
              (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
          ; getVideoMetadata requires :language from user-observable so nest inside map function
          video-metadata-observable (-> user-observable 
              (.mapMany
                ; fetch metadata after a response from user-observable is received
                (fn [user-map] 
                  (getVideoMetadata videoId (:language user-map)))))]
          ; now combine 3 observables using zip
          (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                (fn [bookmark-map metadata-map user-map]
                  {:bookmark-map bookmark-map 
                  :metadata-map metadata-map
                  :user-map user-map}))
            ; and transform into a single response object
            (.map (fn [data]
                  {:video-id videoId
                   :video-metadata (:metadata-map data)
                   :user-id userId
                   :language (:language (:user-map data))
                   :bookmark (:viewed-position (:bookmark-map data))
                  })))))

输出是这样的:

{:video-id 78965, 
 :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                  :director David Fincher, :duration 3365}, 
 :user-id 12345, :language es-us, :bookmark 0}

这里有一个图例解释了这个过程:

下面的例子使用 Groovy,来自这里 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用 merge 操作结合两个 Observables,使用 reduce 操作符从结果序列构建一个单独的结果数据项,然后在发射之前,使用 map() 变换那个结果。

public Observable getVideoSummary(APIVideo video) {
   def seed = [id:video.id, title:video.getTitle()];
   def bookmarkObservable = getBookmark(video);
   def artworkObservable = getArtworkImageUrl(video);
   return( Observable.merge(bookmarkObservable, artworkObservable)
      .reduce(seed, { aggregate, current -> aggregate << current })
      .map({ [(video.id.toString() : it] }))
}

这里也有一个图例解释 reduce 从多个 Observable 的结果构建一个单一结构的过程:

3.3 错误处理

以下是修订后的维基百科示例的一个版本,其中包括错误处理代码:

/*
 * Fetch a list of Wikipedia articles asynchronously, with error handling.
 */
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
    return Observable.create({ subscriber ->
        Thread.start {
            try {
                for (articleName in wikipediaArticleNames) {
                    if (true == subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                }
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch(Throwable t) {
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }
            return (subscriber);
        }
    });
}

下面的例子使用 Groovy,注意错误发生时现在是如何调用 onError(Throwable t) 的,下面的代码传递给 subscribe() 第二个方法用户处理 onError() 通知:

fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
    .subscribe(
        { println "--- Article ---\n" + it.substring(0, 125) }, 
        { println "--- Error ---\n" + it.getMessage() })

查看 错误处理操作符 这一页了解更多 RxJava 中的错误处理技术,包括使用 onErrorResumeNext()onErrorReturn()等方法,它们让你可以从错误中恢复。

下面是一个示例,说明如何使用此方法传递有关遇到的任何异常的自定义信息。想象一下,你有一个 Observable 或级联的 ObservablesmyObservable,你想拦截通常会传递给 SubscriberonError() 方法的任何异常,用你自己设计的自定义 Throwable 替换它们。你可以通过使用 onErrorResumeNext() 方法修改 myObservable 来执行此操作,并将一个 Observable 传递给该方法,该 Observable 使用你自定义的 Throwable 调用 onError()(使用 error() 方法将为你生成这样的 Observable):

myModifiedObservable = myObservable.onErrorResumeNext({ t ->
   Throwable myThrowable = myCustomizedThrowableCreator(t);
   return (Observable.error(myThrowable));
});
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容