线程切换:https://juejin.im/post/5a6751af6fb9a01cb2571794
public class Main {
public static void main(String[] args)
{
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) {
observableEmitter.onNext("12");
}
}).flatMap(new Function<String, Observable<Integer>>() {
@Override
public Observable<Integer> apply(String s) {
final Integer value=Integer.parseInt(s);
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) {
observableEmitter.onNext(value);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
void onNext(Integer value) {
System.out.println(value + 10);
}
});
}
}
public class ObservableCreate<T> extends Observable<T>{
ObservableOnSubscribe observableOnSubscribe;
public ObservableCreate(ObservableOnSubscribe<T> observableOnSubscribe)
{
this.observableOnSubscribe=observableOnSubscribe;
}
@Override
protected void subscribeActual (Observer observer) {
observableOnSubscribe.subscribe(new CreateEmitter(observer));
}
}
public class CreateEmitter implements ObservableEmitter{
Observer observer;
public CreateEmitter(Observer observer)
{
this.observer=observer;
}
@Override
public void onNext(Object value) {
observer.onNext(value);
}
}
public interface ObservableEmitter<T> {
void onNext(T value);
}
public abstract class Observer<T> {
abstract void onNext(T value);
}
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> observableEmitter);
}
public class ObservableFlatMap<T,R> extends Observable{
Observable<T> observable;
Function<T,? extends Observable<? extends R>> mapper;
public ObservableFlatMap(Observable<T> observable,Function<T,? extends Observable<? extends R>> mapper)
{
this.mapper=mapper;
this.observable=observable;
}
@Override
protected void subscribeActual(Observer observer) {
observable.subscribe(new MergeObserver(mapper,observer));
}
}
public class MergeObserver<T,R> extends Observer<T>{
Function<T,? extends Observable<? extends R>> mapper;
Observer<T> observer;
public MergeObserver(Function<T,? extends Observable<? extends R>> mapper,Observer<T> observer)
{
this.mapper=mapper;
this.observer=observer;
}
@Override
void onNext(T value) {
Observable observable=mapper.apply(value);
observable.subscribe(observer);
}
}
public interface Function<T,R> {
R apply(T t);
}
public abstract class Observable<T> {
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
public final <R> Observable<R> flatMap(Function<T,? extends Observable<? extends R>> mapper) {
return new ObservableFlatMap(this,mapper);
}
public final <U> void subscribe(Observer<U> observer) {
subscribeActual(observer);
}
protected abstract <U> void subscribeActual(Observer<U> observer);
}
输出: