以下基于Rxjava1.x, 主要面向目标是有多种Callback的改造。
现有功能
假设现有功能如下:
public class MockService {
void getName(String url, StringCallBack stringCallBack){
stringCallBack.callString("Hello world!");
}
void getAge(String url, IntCallback intCallback){
intCallback.callInt(18);
}
}
public abstract class IntCallback {
public abstract void callInt(int value);
}
public abstract class StringCallback {
public abstract void callString(String string);
}
现在希望可以把这些callback
方式,变成返回Observable<T>
的方式。
思考
下面的Observable 都指Rxjava中的Observable
方案一
Callback与Observable的结合就是需要在Callback调用Observable绑定Observer的onNext之类的方法。所以我们需要生成两部分内容,一个特殊的Callback和一个特殊的Observable。用一个类来存储这两个信息:
public class ResultHolder<T, R> {
public Observable<T> observable;
public R callback;
}
// 使用的时候大概这种感觉
public static void main(String[] args){
MockService mockService = new MockService();
ResultHolder<MockData, StringCallback> holder = ...;
mockService.getName("xxxx", holder.callback);
holder.observable.subscribe(mockData -> {});
}
下面思考如何生成这个holder。我们有多种Callback返回的类型也是多种的所以需要设计一个如下的东西:
public <T,R> ResultHolder<T,R> create(Type resultType, Type callbackType){
...
}
我们需要得到返回Observable<T>
和R表示的callback
,这两个类型是不确定的。所以需要在参数中传入这两个类型,这样函数内部就可以确定外部想要的期望结果,并进行强制类型转换。
public <T,R> ResultHolder<T,R> create(Type resultType, Type callbackType){
ResultHolder<T, R> resultHolder = new ResultHolder<>();
PublishSubject<T> subject = PublishSubject.create();
resultHolder.observable = subject;
TypeAdapter<T> typeAdapter = getAdapter(callbackType);
resultHolder.callback = typeAdapter.getCallback(getConverter(resultType), subject);
return resultHolder;
}
Observable就使用Subject对象, 这样它既是Observable又是Observer。为了能让Observable和Callback做出关联,需要把Observer传入到Callback的具体实现中,这里Callback的具体实现是有限个的。所以可以把有限的Callback实现先存起来,用的时候再找到使用就可以了。根据这个要求完成该类:
public class RxJavaCallbackHelper {
private final Converter.Factory mConverterFactory;
private Map<TypeToken<?>, TypeAdapter<?>> mCached;
public RxJavaCallbackHelper(Converter.Factory factory){
mConverterFactory = factory;
mCached = new HashMap<>();
mCached.put(TypeToken.get(StringCallback.class), new StringCallbackAdapter<>());
}
public <T,R> ResultHolder<T,R> create(Type resultType, Type callbackType){
ResultHolder<T, R> resultHolder = new ResultHolder<>();
PublishSubject<T> subject = PublishSubject.create();
resultHolder.observable = subject;
TypeAdapter<T> typeAdapter = getAdapter(callbackType);
resultHolder.callback = typeAdapter.getCallback(getConverter(resultType), subject);
return resultHolder;
}
@SuppressWarnings("unchecked")
public <T> TypeAdapter<T> getAdapter(Type type){
return (TypeAdapter<T>) mCached.get(TypeToken.get(type));
}
@SuppressWarnings("unchecked")
public <T> Converter<T> getConverter(Type type){
return (Converter<T>) mConverterFactory.createConverter(type);
}
}
public interface TypeAdapter<T>{
<R> R getCallback(Converter<T> converter, Observer<T> observer);
}
public interface Converter<T> {
T convert(String str);
interface Factory{
Converter<?> createConverter(Type type);
}
}
TypeToken是Gson里的类,使用TypeToken是因为其实现了判断两个Type是否相等的方法。这里有两处使用了强制类型转换,不过这两个地方都通过参数传入了期望的Type所以强制类型转换也是按预期进行的。
public class GsonConverterFactory implements Converter.Factory {
private Gson mGson;
public GsonConverterFactory(Gson gson){
mGson = gson;
}
@Override
public Converter<?> createConverter(Type type) {
return new GsonConverter<>(mGson, type);
}
}
public class GsonConverter<T> implements Converter<T> {
private final Gson mGson;
private final Type mType;
public GsonConverter(Gson gson, Type type){
mGson = gson;
mType = type;
}
@Override
public T convert(String str) {
return mGson.fromJson(str, mType);
}
}
public class StringCallbackAdapter<T> implements TypeAdapter<T> {
@SuppressWarnings("unchecked")
@Override
public <R> R getCallback(Converter<T> converter, Observer<T> observer) {
return (R) new StringCallbackWrapped<>(converter, observer);
}
}
class StringCallbackWrapped<T> extends StringCallback {
private final Converter<T> mConverter;
private final Observer<T> mObserver;
StringCallbackWrapped(Converter<T> converter, Observer<T> observer){
mConverter = converter;
mObserver = observer;
}
@Override
public void callString(String string) {
try {
mObserver.onNext(mConverter.convert(string));
mObserver.onCompelete();
}catch (Exception e){
mObserver.onError(e);
}
}
}
写个Test测试下:
public static void main(String[] args){
MockService mockService = new MockService();
RxJavaCallbackHelper helper = new RxJavaCallbackHelper(new GsonConverterFactory(new Gson()));
ResultHolder<MockData, StringCallback> holder = helper.create(MockData.class, StringCallback.class);
mockService.getName("", holder.callback);
holder.observable.subscribe(mockData -> System.out.println("mock data code:" + mockData.code),
(e)->{}, ()->System.out.println("completed!"));
}
然后没有结果输出,因为这里的MockService是同步操作,在Observable订阅之前就已经结束操作了。需要对上面的代码进行一些改动
// RxJavaCallbackHelper
public <T,R> ResultHolder<T,R> create(Type resultType, Type callbackType){
ResultHolder<T, R> resultHolder = new ResultHolder<>();
BehaviorSubject<T> subject = BehaviorSubject.create(); // 可以把最近一次的结果发送给订阅者
resultHolder.observable = rx.Observable.defer(() -> subject.take(1));// 订阅的时候才创建一个只能发送一次数据的Observable
TypeAdapter<T> typeAdapter = getAdapter(callbackType);
resultHolder.callback = typeAdapter.getCallback(getConverter(resultType), subject);
return resultHolder;
}
// StringCallbackWrapped
public void callString(String string) {
try {
mObserver.onNext(mConverter.convert(string));
// mObserver.onCompleted(); 删除这行,结束通过上面的take(1) 处理
}catch (Exception e){
mObserver.onError(e);
}
}
方案二
//直接返回一个Observable
public <T, R> CallbackObservable<T, R> create(Type resultType, Type callbackType){
...
}
//获取Callback用来实际执行
CallbackObservable<T, R> callbackObservable;
Callback callback = callbackObservable.getCallback();
需要看看如何从Subject类里进行扩展吧,还没研究
方案三
public <T,R> rx.Observable<T> create(Type resultType, Type callbackType, OnCallbackCreated<R> onCallbackCreated){
...
}
在OnCallbackCreated里把封装好的Callback传出来,这种形式还有个好处是,可以真正做到在subscribe的时候才去执行,上面两种只适合拿到Observable就马上订阅的, 不适合先持有等到某个时间再订阅。
结语
主要就是为了封装细节,减小外部调用时的学习成本。