public class Main {
public static void main(String[] args)
{
Observable.create(new ObservableOnSubscribe(){
@Override
public void subscribe(ObservableEmitter observableEmitter) {
observableEmitter.onNext(12);
}
}).map(new Function<Integer,String>() {
@Override
public String apply(Integer i) {
return "模拟数字转换字符串"+i;
}
}).subscribe(new Observer<String>(){
@Override
public void onNext(String str) {
System.out.println(str);
}
});
}
}
public abstract class Observable<T,R> {
public static Observable create(ObservableOnSubscribe source) {
return new ObservableCreate(source);
}
public final void subscribe(Observer observer)
{
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<T> observer);
public final Observable map(Function<T,R> mapper) {
return new ObservableMap<T,R>(this, mapper);
}
}
/**
* Observalbe的实现类
*/
public class ObservableCreate extends Observable {
private ObservableOnSubscribe observableOnSubscribe;
public ObservableCreate(ObservableOnSubscribe observableOnSubscribe)
{
this.observableOnSubscribe=observableOnSubscribe;
}
@Override
protected void subscribeActual(Observer observer) {
observableOnSubscribe.subscribe(new CreateEmitter(observer));
}
}
public interface ObservableOnSubscribe {
void subscribe(ObservableEmitter observableEmitter);
}
public interface ObservableEmitter<T> {
void onNext(T value);
}
public interface Function<T,R> {
R apply(T i);
}
public class ObservableMap<T,R> extends Observable {
private Function function;
private Observable observable;
public ObservableMap(Observable observable,Function function)
{
this.observable=observable;
this.function=function;
}
@Override
protected void subscribeActual(Observer observer) {
//感觉是适配器模式
observable.subscribe(new MapObserver(observer,function));
}
final class MapObserver extends Observer<T>{
Observer<R> observer;
Function<T,R> function;
public MapObserver(Observer observer,Function function)
{
this.observer=observer;
this.function=function;
}
@Override
public void onNext(T str) {
R r=function.apply(str);
observer.onNext(r);
}
}
}
public abstract class Observer<T> {
public abstract void onNext(T str);
}
public class CreateEmitter<T> implements ObservableEmitter<T>{
Observer observer;
CreateEmitter(Observer observer)
{
this.observer=observer;
}
@Override
public void onNext(T value) {
observer.onNext(value);
}
}
输出: