学习资料
优酷视频地址:
Android RxJava使用介绍(二) RxJava的操作符
Android RxJava使用介绍(三) RxJava的操作符
Android RxJava使用介绍(四) RxJava的操作符
package com.example.paydemo;
import org.junit.Test;
import org.junit.runner.notification.RunListener;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import static org.junit.Assert.*;
/**
* 学习资料 视频地址:http://v.youku.com/v_show/id_XMTUzMDQ5ODUyMA==.html?f=27049801&from=y1.7-1.3
*/
public class RxJavaTest {
@Test
public void test1() {
//可观察的
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
});
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
myObservable.subscribe(mySubscriber);
}
@Test
public void test2() {
Observable<String> observable = Observable.just("hello ");
observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Action1 action = new Action1() {
@Override
public void call(Object o) {
System.out.println(o);
}
};
observable.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -Dan";
}
}).subscribe(action); //要连惯着写,map返回的是个新的对象
observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Action:" + s);
}
});
}
@Test
public void test3() {
Observable.just("Hello, world!")
.map(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s + "objdect";
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o);
}
});
}
@Test
public void test4() {
Action1<Integer> onNext = new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
};
Observable.range(10, 10).repeat(5).subscribe(onNext);
}
@Test
public void test5() {
Observable.just(1, 2, 2, 4)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 1;
}
})
.distinct() //不重复的,2只会执行一次
// .observeOn(Schedulers.newThread())
// .subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
System.out.println(Thread.currentThread().getName());
}
});
}
@Test
public void test6() {
System.out.println(Thread.currentThread().getName());
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String hello = "hello";
System.out.println(hello);
System.out.println(Thread.currentThread().getName());
subscriber.onNext(hello);
}
}).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
System.out.println(Thread.currentThread().getName());
}
});
//要让线程休眠,否则不会打印出日志的
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test7() {
String[][] temps = new String[][]{{"22", "33"}, {"44", "55", "66"}};
Observable.from(temps).flatMap(new Func1<String[], Observable<String>>() {
@Override
public Observable<String> call(String[] strings) {
return Observable.from(strings);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
@Test
public void test8() {
String[] str = new String[]{"1", "1", "2", "3"};
Observable.from(str).filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.equals("1"); //只有true的时候才会执行
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
@Test
public void test9() {
String[] str = new String[]{"1", "1", "2", "3"};
Observable.from(str)
// .take(1) //从起始开始取几个
.takeLast(1) //从结束的位置取
// .filter(new Func1<String, Boolean>() {
// @Override
// public Boolean call(String s) {
// return s.equals("1"); //只有true的时候才会执行
// }
// })
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
@Test
public void test10() {
Integer[] t = new Integer[]{1, 1, 2, 2, 1, 1, 2, 2};
Observable.from(t)
.distinctUntilChanged() //连续重复的只执行一次
// .distinctUntilChanged(new Func1<Integer, Boolean>() { //用法还不明白
// @Override
// public Boolean call(Integer integer) {
// return integer==2;
// }
// })
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
}
}