操作符简介
操作符:将发出的数据进行处理并再发送
变化传播--通过操作符实现变化,并能向下传播
1.RxJava1操作符源码分析
1.Func1接口
2.Operator接口
1.1RxJava1实例
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onCompleted();
}
}
}).
//处理
map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted:");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d("kpioneer", "onNext:" + integer + ",integer instanceOf" + integer.getClass());
}
});
运行
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
1.2RxJava1操作符源码
RxJava1中OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
RxJava1中OnSubscribeLift类
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
1.3变换的原理(核心操作符lift):
1.接收原OnSubscribe的当前的Operator
2.创建一个新的OnSubscribe并返回新的Observable
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
3.用新的Subscriber包裹旧的Subscriber
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
4.在新的Subscriber里做完变换再传给旧的Subscriber
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
分析
核心实现使用了代理机制
2.RxJava2操作符源码分析
2.1.RxJava2实例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("1");
e.onNext("2");
e.onComplete();
}
}
}).
map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("kpioneer", "onSubscribe:");
}
@Override
public void onNext(Integer value) {
Log.d("kpioneer", "onNext:" + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d("kpioneer", "onComplete" );
}
});
Flowable.
create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
if (!e.isCancelled()) {
e.onNext("1");
e.onNext("2");
e.onComplete();
}
}
}, BackpressureStrategy.DROP).
map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
Log.d("kpioneer", "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d("kpioneer", "onNext:" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("kpioneer", "onComplete");
}
});
运行
06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe:
06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:3
06-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:4
06-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete
06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe
06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:3
06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:4
06-11 10:20:56.768 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete
2.2.RxJava2操作符源码
Function接口
ObservableMap :无背压
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
1.ObservableMap继承了AbstractObservableWithUpstream抽象类
2.利用了AbstractObservableWithUpstream中subscribeActual方法
3.用原Observable去subscribe变换后的Observer
public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
/** The actual operator. */
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Observer<? super R> s) {
Observer<? super T> observer;
try {
observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Disposable already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
source.subscribe(observer);
}
}
FlowableMap: 有背压
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends U> mapper;
public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
super(source);
this.mapper = mapper;
}
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<T, U>(s, mapper));
}
}
static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
final Function<? super T, ? extends U> mapper;
MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
static final class MapConditionalSubscriber<T, U> extends BasicFuseableConditionalSubscriber<T, U> {
final Function<? super T, ? extends U> mapper;
MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) {
super(actual);
this.mapper = function;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public boolean tryOnNext(T t) {
if (done) {
return false;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return true;
}
return actual.tryOnNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
1.FlowableMap继承了AbstractFlowableWithUpstream
2.利用了AbstractFlowableWithUpstream中的subscribeActual方法
3.用原Flowable去subscribe变换后的Subscriber
FlowableLift
public final class FlowableLift<R, T> extends AbstractFlowableWithUpstream<T, R> {
/** The actual operator. */
final FlowableOperator<? extends R, ? super T> operator;
public FlowableLift(Flowable<T> source, FlowableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Subscriber<? super R> s) {
try {
Subscriber<? super T> st = operator.apply(s);
if (st == null) {
throw new NullPointerException("Operator " + operator + " returned a null Subscriber");
}
source.subscribe(st);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
}
2.3Operator接口
1.实现此接口
2.在subscribeActual中做变换
3.用于扩展自定义操作符
分析
RxJava2 有背压和无背压核心实现使用了代理机制
3.RxJava1 操作符功能仿写实现
Operator接口实现
- Operator接口是操作符的抽象接口
- 个操作符实现Operator接口用于处理具体的变换
lift操作符
- 变换的基本原理
- 各操作符均实现Operator接口,并调用lift操作符
map操作符
- 最基本的操作符
- 顾名思义,用于做映射
public class Caller<T> {
final OnCall<T> onCall;
public Caller(OnCall<T> onCall) {
this.onCall = onCall;
}
public static <T> Caller<T> create(OnCall<T> onCall) {
return new Caller<>(onCall);
}
public Calling call(Receiver<T> receiver) {
this.onCall.call(receiver);
return receiver;
}
public final <R> Caller<R> lift(final Operator<R, T> operator) {
return create(new OnCallLift<>(onCall, operator));
}
public final <R> Caller<R> map(Func1<T, R> func) {
return lift(new MapOperator<T, R>(func));
}
public interface OnCall<T> extends Action1<Receiver<T>> {
}
public interface Operator<R, T> extends Func1<Receiver<R>, Receiver<T>> {
}
}
public interface Func1<T,R>{
R call(T t);
}
public class MapOperator<T, R> implements Caller.Operator<R, T> {
private final Func1<T, R> mapper;
public MapOperator(Func1<T, R> mapper) {
this.mapper = mapper;
}
@Override
public Receiver<T> call(Receiver<R> rReceiver) {
return new MapReceiver<>(rReceiver, this.mapper);
}
}
public class MapReceiver<T, R> extends Receiver<T> {
private final Receiver<R> actual;
private final Func1<T, R> mapper;
public MapReceiver(Receiver<R> actual, Func1<T, R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onCompleted() {
this.actual.onCompleted();
}
@Override
public void onError(Throwable t) {
this.actual.onError(t);
}
@Override
public void onReceive(T t) {
R tR = this.mapper.call(t);
this.actual.onReceive(tR);
}
}
public class OnCallLift<T, R> implements Caller.OnCall<R> {
private final Caller.OnCall<T> parent;
private final Caller.Operator<R, T> operator;
public OnCallLift(Caller.OnCall<T> parent, Caller.Operator<R, T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Receiver<R> rReceiver) {
Receiver<T> tReceiver = this.operator.call(rReceiver);
this.parent.call(tReceiver);
}
}
调用
public class Lesson2_2Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("1");
stringReceiver.onReceive("2");
stringReceiver.onCompleted();
}
}
}).
map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s)+2;
}
}).
call(new Receiver<Integer>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer", "onReceive:" + integer);
}
});
}
}
Log输出
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:3
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:4
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onCompleted
3.RxJava2(无背压) 操作符功能仿写实现
CallerWithUpstream(类似于AbstractObservableWithUpstream)
- 一个抽象类
- 有callActual方法
- 实现操作符需实现此方法
map操作符
- 最基本的操作符
- 顾名思义,用于做映射
CallerOperator接口
- 在callActual中做变换
- 可用于扩展操作符
相关代码:
public abstract class Caller<T> {
public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) {
return new CallerCreate<>(callerOnCall);
}
public void call(Callee<T> callee) {
callActual(callee);
}
protected abstract void callActual(Callee<T> callee);
public <R> Caller<R> lift(CallerOperator<R, T> operator) {
return new CallerLift<>(this, operator);
}
public <R> Caller<R> map(Function<T, R> function) {
return new CallerMap<>(this, function);
}
}
public interface CallerOperator<T,R> {
Callee<R> call(Callee<T> callee);
}
/**
* Created by Xionghu on 2018/6/11.
* Desc:返回源Caller
*/
public interface CallerSource<T> {
Caller<T> source();
}
public abstract class CallerWithUpstream<T, R> extends Caller<R> implements CallerSource<T> {
protected final Caller<T> source;
public CallerWithUpstream(Caller<T> source) {
this.source = source;
}
@Override
public Caller<T> source() {
return source;
}
}
public class CallerLift<R, T> extends CallerWithUpstream<T, R> {
private final CallerOperator<R, T> mOperator;
public CallerLift(Caller<T> source, CallerOperator<R, T> mOperator) {
super(source);
this.mOperator = mOperator;
}
@Override
protected void callActual(Callee<R> callee) {
Callee<T> tCallee = mOperator.call(callee);
source.call(tCallee);
}
}
public interface Function<T, R> {
R call(T t);
}
public class CallerMap<T, R> extends CallerWithUpstream<T, R> {
private Function<T, R> function;
public CallerMap(Caller<T> source, Function<T, R> function) {
super(source);
this.function = function;
}
@Override
protected void callActual(Callee<R> callee) {
source.call(new MapCallee<>(callee, function));
}
static class MapCallee<T, R> implements Callee<T> {
private final Callee<R> mCallee;
private final Function<T, R> mFunction;
public MapCallee(Callee<R> mCallee, Function<T, R> mFunction) {
this.mCallee = mCallee;
this.mFunction = mFunction;
}
@Override
public void onCall(Release release) {
mCallee.onCall(release);
}
@Override
public void onReceive(T t) {
R tR = mFunction.call(t);
mCallee.onReceive(tR);
}
@Override
public void onCompleted() {
mCallee.onCompleted();
}
@Override
public void onError(Throwable t) {
mCallee.onError(t);
}
}
}
/**
* Created by Xionghu on 2018/6/11.
* Desc: 仿写RxJava2 无背压 操作符方法
*/
public class Lesson2_3Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("1");
callerEmitter.onReceive("2");
callerEmitter.onCompleted();
}
}).
map(new Function<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).
call(new Callee<Integer>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer", "onReceive:" + integer);
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
});
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("3");
callerEmitter.onReceive("4");
callerEmitter.onCompleted();
}
}).
lift(new CallerOperator<Integer, String>() {
@Override
public Callee<String> call(final Callee<Integer> callee) {
return new Callee<String>() {
@Override
public void onCall(Release release) {
callee.onCall(release);
}
@Override
public void onReceive(String s) {
callee.onReceive(Integer.parseInt(s));
}
@Override
public void onCompleted() {
callee.onCompleted();
}
@Override
public void onError(Throwable t) {
callee.onError(t);
}
};
}
}).
call(new Callee<Integer>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer", "onReceive:" + integer);
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer", "onError");
}
});
}
}
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:1
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:2
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:3
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:4
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted
4.RxJava2(有背压) 操作符功能仿写实现
TelephonerOperator接口
- 在callActual中做变换
- 可用于扩展操作符
TelephonerWithUpstream(类似于AbstractObservableWithUpstream)
- 一个抽象类
- 有callActual方法
- 实现操作符需实现此方法
相关源码
public abstract class Telephoner<T> {
public static <T> Telephoner<T> create(TelephonerOnCall<T> telephonerOnCall){
return new TelephonerCreate<>(telephonerOnCall);
}
public void call(Receiver<T> receiver) { callActual(receiver);}
protected abstract void callActual(Receiver<T> receiver);
public <R> Telephoner<R> map(Function<T, R> function) {
return new TelephonerMap<>(this, function);
}
public <R> Telephoner<R> lift(TelephonerOperator<R, T> telephonerOperator) {
return new TelephonerLift<>(this, telephonerOperator);
}
}
/**
* Created by Xionghu on 2018/6/12.
* Desc: lift操作符
*/
public class TelephonerLift<R, T> extends TelephonerWithUpstream<T, R> {
private final TelephonerOperator<R, T> operator;
public TelephonerLift(Telephoner<T> source, TelephonerOperator<R, T> operator) {
super(source);
this.operator = operator;
}
@Override
protected void callActual(Receiver<R> receiver) {
Receiver<T> tReceiver = operator.call(receiver);
source.call(tReceiver);
}
}
import com.haocai.mylibrary.rxJava2.Function;
/**
* Created by Xionghu on 2018/6/12.
* Desc: map操作符
*/
public class TelephonerMap<T, R> extends TelephonerWithUpstream<T, R> {
private Function<T, R> trFunction;
public TelephonerMap(Telephoner<T> source, Function<T, R> trFunction) {
super(source);
this.trFunction = trFunction;
}
@Override
protected void callActual(Receiver<R> receiver) {
source.call(new MapReceiver<>(receiver, trFunction));
}
static class MapReceiver<T, R> implements Receiver<T> {
private final Receiver<R> rReceiver;
private final Function<T, R> trFunction;
public MapReceiver(Receiver<R> rReceiver, Function<T, R> trFunction) {
this.rReceiver = rReceiver;
this.trFunction = trFunction;
}
@Override
public void onCall(Drop d) {
rReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
R tr = trFunction.call(t);
rReceiver.onReceive(tr);
}
@Override
public void onError(Throwable t) {
rReceiver.onError(t);
}
@Override
public void onCompleted() {
rReceiver.onCompleted();
}
}
}
/**
* Created by Xionghu on 2018/6/12.
* Desc: 操作符接口
*/
public interface TelephonerOperator<T, R> {
Receiver<R> call(Receiver<T> callee);
}
/**
* Created by Xionghu on 2018/6/11.
* Desc: 返回源Telephoner
*/
public interface TelephonerSource<T> {
Telephoner<T> source();
}
public abstract class TelephonerWithUpstream<T, R> extends Telephoner<R> implements TelephonerSource {
protected final Telephoner<T> source;
public TelephonerWithUpstream(Telephoner<T> source) {
this.source = source;
}
@Override
public Telephoner source() {
return source;
}
}
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.haocai.mylibrary.rxJava2.Function;
import com.haocai.mylibrary.rxJava2.backpressure.Drop;
import com.haocai.mylibrary.rxJava2.backpressure.Receiver;
import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;
import com.haocai.rxjavademo.R;
import butterknife.ButterKnife;
import butterknife.OnClick;
/**
* Created by Xionghu on 2018/6/11.
* Desc: 仿写RxJava2 无背压 操作符方法
*/
public class Lesson2_4Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("1");
telephonerEmitter.onReceive("2");
telephonerEmitter.onCompleted();
}
}).
map(new Function<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).
call(new Receiver<Integer>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer", "onReceive:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer", "onError");
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
});
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("3");
telephonerEmitter.onReceive("4");
telephonerEmitter.onCompleted();
}
}).
lift(new TelephonerOperator<Integer, String>() {
@Override
public Receiver<String> call(final Receiver<Integer> receiver) {
return new Receiver<String>() {
@Override
public void onCall(Drop d) {
receiver.onCall(d);
}
@Override
public void onReceive(String s) {
receiver.onReceive(Integer.parseInt(s));
}
@Override
public void onError(Throwable t) {
receiver.onError(t);
}
@Override
public void onCompleted() {
receiver.onCompleted();
}
};
}
}).
call(new Receiver<Integer>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer", "onReceive:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer", "onError");
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
});
}
}
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:1
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:2
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:3
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:4
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted