RxJava2正式版发布,学习一下源码和操作符实现,顺便做一下笔记。
一、切入正题
引入使用
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.7'
简单使用
/**
* Observable 被观察者对象
*
*Observer 观察者
*
*subscribe 注册订阅
*/
Observable.just("hello rxjava")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("TAG","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("TAG","onNext===="+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("TAG","onError");
}
@Override
public void onComplete() {
Log.e("TAG","onComplete");
}
});
源码分析
just方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
可以看出Observable对象
public abstract class Observable<T> implements ObservableSource<T> {}
实现ObservableSource
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
就一个方法,回来继续看just,new 的ObservableJust对象
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
而onAssembly还是返回原来的对象
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
just方法就是返回Observable对象
subscribe方法
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
调用 subscribeActual(observer);
protected abstract void subscribeActual(Observer<? super T> observer);
实际是
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
ScalarDisposable对象
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
调用ScalarDisposable的run();
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
二、简单实现以上代码
Observer
/**
* 观察者
*/
public interface Observer<T> {
void onSubscribe();
void onNext(@NonNull T item);
void onError(@NonNull Throwable e);
void onComplete();
}
ObservableSource
public interface ObservableSource<T> {
void subscribe(Observer<T> observer);
}
Observable
/**
* 被观察者
*/
public abstract class Observable<T> implements ObservableSource<T>{
public static <T> ObservableSource<T> just(T item) {
return onAssembly(new ObservableJust<T>(item));
}
private static <T> ObservableSource<T> onAssembly(ObservableJust<T> source) {
// 留出来了
return source;
}
@Override
public void subscribe(Observer<T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<T> observer);
}
ObservableJust
public class ObservableJust<T> extends Observable<T> {
private T item;
public ObservableJust(T item) {
this.item = item;
}
@Override
protected void subscribeActual(Observer<T> observer) {
// 代理对象, 方便代码扩展,
ScalarDisposable sd = new ScalarDisposable(observer,item);
observer.onSubscribe();
sd.run();
}
private class ScalarDisposable<T>{
private Observer observer;
private T item;
public ScalarDisposable(Observer<T> observer, T item) {
this.observer = observer;
this.item = item;
}
public void run(){
try {
observer.onNext(item);
observer.onComplete();
}catch (Exception e){
observer.onError(e);
}
}
}
}
三、map()操作符实现
在上面的基础上,实现效果
Observable.just("http://img.taopic.com/uploads/allimg/130331/240460- 13033106243430.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlPath) throws Exception {
// 第五步
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
bitmap = createWatermark(bitmap, "RxJava2.0");
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
return bitmap;
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void onNext(final Bitmap bitmap) {
// 第七步
Log.e("TAG", "item = " + bitmap);
runOnUiThread(new Runnable() {
@Override
public void run() {
mImage.setImageBitmap(bitmap);
}
});
}
});
实现
public interface Function<T,R> {
R apply(T t) throws Exception;
}
Observable类添加
public <R> Observable<R> map(Function<T, R> function) {
return onAssembly(new ObservableMap<>(this,function));
}
ObservableMap
public class ObservableMap<T,R> extends Observable<R> {
final Observable<T> source;// 前面的 Observable
final Function<T, R> function;// 当前转换
public ObservableMap(Observable<T> source, Function<T, R> function) {
this.source = source;
this.function = function;
}
@Override
protected void subscribeActual(Observer<R> observer) {
// 第一步
// 对 observer 包裹了一层
source.subscribe(new MapObserver(observer,function));
}
private class MapObserver<T> implements Observer<T>{
final Observer<R> observer;
final Function<T, R> function;
public MapObserver(Observer<R> source, Function<T, R> function) {
this.observer = source;
this.function = function;
}
@Override
public void onSubscribe() {
observer.onSubscribe();
}
@Override
public void onNext(@NonNull T item) {
// item 是 String xxxUrl
// 要去转换 String -> Bitmap
// 4.第四步 function.apply
try {
R applyR = function.apply(item);
// 6. 第六步,调用 onNext
// 把 Bitmap 传出去
observer.onNext(applyR);
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
方法
source.subscribe(new MapObserver(observer,function));
调用ObservableJust的
@Override
protected void subscribeActual(Observer<T> observer) {
// 代理对象,why? 方便代码扩展,
// 2.第二步
ScalarDisposable sd = new ScalarDisposable(observer,item);
observer.onSubscribe();
sd.run();
}
和
public void run(){
try {
// 3.第三步 observer -> MapObserver.onNext(String)
observer.onNext(item);
observer.onComplete();
}catch (Exception e){
observer.onError(e);
}
}
之后回到内部类MapObserver
@Override
public void onNext(@NonNull T item) {
// item 是 String xxxUrl
// 要去转换 String -> Bitmap
// 4.第四步 function.apply
try {
R applyR = function.apply(item);
// 6. 第六步,调用 onNext
// 把 Bitmap 传出去
observer.onNext(applyR);
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}