观察者中的最小单位Consumer
package com.deity.rxjavasample;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
public class ExampleUnitTest {
@Test
public void testConsumer(){
String[] names = new String[]{"曹操","典韦","司马懿","荀"};
helloConsumer(names);
}
public void helloConsumer(String... names){
Observable.fromArray(names).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("name:"+s);
}
});
}
}
看看当前的签名,我们实现的Consumer接口实现了onNext方法
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext,
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
Functions.emptyConsumer());
}
跟踪进去看看,我们发现在内部,RxJava 封装了默认的onError、onComplete、onSubscribe方法.以上实现其实跟接下来的代码实现是同样一个套路
package com.deity.rxjavasample;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
public class ExampleUnitTest {
@Test
public void testConsumer(){
String[] names = new String[]{"曹操","典韦","司马懿","荀"};
helloObserver(names);
}
public void helloObserver(String... names){
Observable.fromArray(names).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext>>>"+s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
map
很多时候,上游下发的数据需要经过转换才能显示,比如:
package com.deity.rxjavasample;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
public class ExampleUnitTest {
@Test
public void testConsumer(){
String[] names = new String[]{"曹操","典韦","司马懿","荀"};
// 1.该方法实现不优雅,
modifyResult(names);
helloObserverMap(names);
}
public void modifyResult(String... names){
Observable.fromArray(names).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("魏将["+s+"]在此,蜀军还不快快投降!");//对结果进行转化
}
});
}
public void helloObserverMap(String... names){
Observable.fromArray(names).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {//对返回结果进行变换
return "魏将["+s+"]在此,蜀军还不快快投降!";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
}
}
/**
* 检查当前设备是否登录
*/
private void checkLogin(){
Observable<Boolean> observable = Observable.create(new ObservableOnSubscribe<Boolean>() {
/**
* Called for each Observer that subscribes.
*
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
@Override
public void subscribe(@NonNull ObservableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(HuanXinHelper.getInstance().isLoggedIn());
}
});
Consumer<Boolean> consumer = new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
if (aBoolean){//当前已经登录
EMClient.getInstance().chatManager().loadAllConversations();
EMClient.getInstance().groupManager().loadAllGroups();
//直接进入首页
startActivity(new Intent(SplashActivity.this, MainActivity.class));
}else {
startActivity(new Intent(SplashActivity.this, LoginActivity.class));
}
}
};
//主题订阅处理,在后台线程上进行处理逻辑
observable.subscribeOn(Schedulers.io()).subscribe(consumer);
}