使用RxJava构建一个常用的useCase,功能主要有2种
1、订阅rxjava对象
2、订阅一次rxjava对象
FlowableUseCase代码如下
package com.example.commonui.utils;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
public class FlowableUseCase {
public static <T> Disposable listener(Flowable flowable, Action initiator, Consumer<T> callback) {
if(initiator == null){
return listener(flowable,callback);
}
return flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER)).map(o->(T)o)
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
}
public static <T> Disposable listener(Flowable flowable, Consumer<T> callback) {
return flowable.map(o->(T)o)
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
}
public static <T> Disposable single(Flowable flowable, Action initiator, Consumer<T> callback) {
SingleSubscriber subscriber = new SingleSubscriber() {
@Override
public void callback(Object o) {
try {
if(callback!=null){
callback.accept((T) o);
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
};
flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER))
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
return subscriber;
}
public static <T> Disposable single(Supplier<T> initiator, Consumer<T> callback) {
SingleSubscriber subscriber = new SingleSubscriber() {
@Override
public void callback(Object o) {
try {
if(callback!=null){
callback.accept((T) o);
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
};
Flowable.create(emitter -> emitter.onNext(initiator.get()), BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
return subscriber;
}
private static abstract class SingleSubscriber<T> extends DisposableSubscriber<T>{
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(T t) {
callback(t);
}
public abstract void callback(T t);
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}
}
FlowableUserCaseCaller代码如下
package com.example.commonui.utils;
import android.util.ArrayMap;
import androidx.lifecycle.Lifecycle;
import androidx.lifecycle.LifecycleEventObserver;
import androidx.lifecycle.LifecycleOwner;
import javax.inject.Inject;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
public class FlowableUserCaseCaller {
Flowable source;
Action action;
Supplier supplier;
Consumer consumer;
int flag;
ArrayMap<Integer, Action> calls = new ArrayMap<>();
CompositeDisposable disposable = new CompositeDisposable();
@Inject
public FlowableUserCaseCaller() {
calls.put(1, () -> disposable.add(FlowableUseCase.listener(source, action, consumer)));
calls.put(2, () -> disposable.add(FlowableUseCase.single(source, action, consumer)));
calls.put(3, () -> disposable.add(FlowableUseCase.single(supplier, consumer)));
}
public FlowableUserCaseCaller listener(Flowable source) {
flag = 1;
this.source = source;
return this;
}
public FlowableUserCaseCaller singleListener(Flowable source) {
this.flag = 2;
this.source = source;
return this;
}
public <T> FlowableUserCaseCaller load(Supplier<T> supplier) {
this.flag = 3;
this.supplier = supplier;
return this;
}
public static FlowableUserCaseCaller bind(LifecycleOwner owner){
FlowableUserCaseCaller caseCaller = new FlowableUserCaseCaller();
owner.getLifecycle().addObserver((LifecycleEventObserver) (source, event) -> {
if (event == Lifecycle.Event.ON_DESTROY) {
caseCaller.dispose();
}
});
return caseCaller;
}
public FlowableUserCaseCaller load(Action action) {
this.action = action;
return this;
}
public <T> FlowableUserCaseCaller callback(Consumer<T> consumer) {
this.consumer = consumer;
return this;
}
public void start() {
try {
if (consumer != null) {
calls.get(flag).run();
}
} catch (Exception ex) {
ex.printStackTrace();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
public void dispose() {
if (!disposable.isDisposed()) {
disposable.dispose();
}
}
}
具体使用如下
1、listener
FlowableUserCaseCaller useCase = new FlowableUserCaseCaller ();
useCase.listener(userManager.name).callback(o -> {
//通知界面更新
name.setValue((String) o);
}).start();
2、singleListener
useCase.singleListener(userManager.singleTest)
.load(userManager::test)
.callback(o -> {
name.setValue(o.toString());
}).start();
3、单次执行方法无监听
useCase.load(userManager::getUser).callback((Consumer<Result<List<User>>>) o -> {
if (o.isValid()) {
datas.setValue(o.getData());
} else {
Log.i("xiaochangyan", "code:" + o.getCode() + " message:" + o.getMessage());
}
isRefreshing.notifyChange();
}).start();
4、释放资源
useCase.dispose()