整理自:
http://blog.chinaunix.net/uid-20771867-id-5187376.html
1、使用subscriber的如下方式来创建Obserable
Observable.create(new Observable.OnSubscribe< ArrayList<Song> >() {
@Override
public void call(Subscriber<? super ArrayList<Song>> subscriber){···}
Subscriber的onNext方法不会自动执行,需要在call方法中手动调用
Observable observable = Observable.create(new Observable.OnSubscribe< ArrayList<Song> >() {
@Override
public void call(Subscriber<? super ArrayList<Song>> subscriber)
{
SongScanInteractor songScanInteractor=new SongScanInteractor(mContext);
songs=songScanInteractor.scanSong();
subscriber.onNext(songs);
subscriber.onCompleted();
}
});
2、应当在Obserable中设置当Subscriber取消对Obserable的监听之后,Obserable不再调用Subscriber的onNext()方法(即发送消息),如下有一段代码示意
private Observable<Integer> createObserver() {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (!subscriber.isUnsubscribed()) {
for (int i = 0; i < 5; i++) {
int temp = new Random().nextInt(10);
if (temp > 8) {
//if value>8, we make an error
subscriber.onError(new Throwable("value >8"));
break;
} else {
subscriber.onNext(temp);
}
// on error,complete the job
if (i == 4) {
subscriber.onCompleted();
}
}
}
}
});
}
3、Obserable发送的信号阻塞了Subscriber的信号处理,导致只有信号发送无信息处理
现在发现Observable的一个特性,那就是Observable不间断发送信号(这里体现为手动调用onNext()),Subscriber的onNext()方法根本不会得到执行,因为来不及执行(我是这么理解的),所以我们需要加上Thread.sleep(400);
这样的代码减缓Obserable发送请求的频率。
同时我还发现,如果这个while(!subscriber.isUnsubscribed())
里面的条件一直设为true,即写成while(true)
,当你把绑定的subscriber解绑之后再与该Obserable绑定,Obserable的onNext()方法依然无法得到执行,与上述不加Thread.sleep(400);
的情况是一样的,即没有信号处理,只有信号发送。
我的理解是,在解绑这段时间里Obserable不断发出的信号没有处理一直被积压,所以自然新加入的Subsciber自然没有能力处理这些积压的发送信号,所以瘫痪了···
Observable observable=Observable.create(new Observable.OnSubscribe<double[]>()
{
@Override
public void call(Subscriber<? super double[]> subscriber)
{
while(!subscriber.isUnsubscribed())
{
try
{
Thread.sleep(400);
} catch (InterruptedException e)
{
e.printStackTrace();
}
double[] doubles =new double[2];
doubles[0] = DeviceUtils.getDeviceWidth(mContext); //屏幕总长
doubles[1] =getMusicCurPos()/getDuration(); //歌曲播放比例
subscriber.onNext(doubles);
Log.d("PlayerProgress","onNext");
}
}
});