学习笔记,自用,整理中
摘要:
--------------------------------------------------------------------------------------------------------------------------------------------------
内容:1
/**
* TODO subscribeOn() 源码分析
*/
public class RxJavaThreadSource1extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// TODO Hook IO 传递进去的Hook
RxJavaPlugins.setIoSchedulerHandler(new Function() {
@Override
public Schedulerapply(Scheduler scheduler)throws Exception {
Log.d(L.TAG, "apply: 全局 监听 scheduler:" +scheduler);
return scheduler;
}
});
// TODO hook 给 IO 初始化
RxJavaPlugins.setInitIoSchedulerHandler(new Function, Scheduler>() {
@Override
public Schedulerapply(Callable schedulerCallable)throws Exception {
Log.d(L.TAG, "apply: 全局 监听 init scheduler:" +schedulerCallable.call());
return schedulerCallable.call();
}
});
Observable.create(
// 自定义source
new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e)throws Exception {
e.onNext("Derry");
Log.d(L.TAG, "自定义source: " + Thread.currentThread().getName());
}
})
// ObservbaleCreate.subscribeOn
// TODO 第二步 new IOScheduler ---> 线程池 传递进去
.subscribeOn(
// TODO 第一步 到底干了什么 ( new IOScheduler ---> 线程池)
Schedulers.io()// 耗时读取的异步
// Schedulers.newThread() // 开启新线程
)
// A线程. subscribe
// ObservableSubscribeOn.subscribe
.subscribe(
// 终点
new Observer() {
@Override
public void onSubscribe(Disposable d) {
Disposable disposable = d;
Log.d(L.TAG, "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(L.TAG, "onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
--------------------------------------------------------------------------------------------------------------------------------------------------
/**
* TODO ObserveOn() 源码分析
*/
public class RxJavaThreadSource2extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
new Thread(){
@Override
public void run() {
super.run();
test();
}
}.start();
}
private void test() {
Observable.create(
// 自定义source
new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e)throws Exception {
e.onNext("Derry");
Log.d(L.TAG, "自定义source: " + Thread.currentThread().getName());
}
})
// TODO 第二步骤
.observeOn(
// TODO 第一步 主线程的Handlnr
AndroidSchedulers.mainThread()
)
// subsceOn(1) // 会显示是这个线程的原因,上一层卡片是被1线程执行
// subsceOn(2)
// subsceOn(3)
// observeOn(A)
// observeOn(B)
// observeOn(C) // 终点是被C执行
// ObservableObserveOn.subscribe
.subscribe(
// 终点
new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(L.TAG, "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(L.TAG, "onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void t() {
new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
// UI 操作...
}
};
new Thread(){
@Override
public void run() {
super.run();
new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
// UI 操作 ... 百分之百 Ui线程 没有问题
}
};
}
}.start();
new Handler(new Handler.Callback() {
@Override
public boolean handleMessage(@NonNull Message msg) {
return false;
}
});
}
}
--------------------------------------------------------------------------------------------------------------------------
自定义事件
public class RxView {
private final static StringTAG = RxView.class.getSimpleName();
// 我们自己的操作符 == 函数
public static Observableclicks(View view) {
return new ViewClickObservable(view);
}
}
--------------------------------------------------------------------------------------------------------------------------
public class ViewClickObservableextends Observable {
private final Viewview;
// 事件 第一节课 防抖 事件 没用
private static final ObjectEVENT =new Object();
private static ObjectEVENT2;
public ViewClickObservable(View view) {
this.view = view;
EVENT2 = view;
}
@Override
protected void subscribeActual(Observer observer) {
// 可以干自己的
MyListener myListener =new MyListener(view, observer);
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
// 我们的包裹
static final class MyListenerimplements View.OnClickListener, Disposable {
private final Viewview;
private Observerobserver; // 存一份 下一层
// 原子性,同学们自行看看文章
// https://www.jianshu.com/p/8a44d4a819bc
// boolean == AtomicBoolean
private final AtomicBooleanisDisposable =new AtomicBoolean();
public MyListener(View view, Observer observer) {
this.view = view;
this.observer = observer;
}
@Override
public void onClick(View v) {
if (isDisposed() ==false) {
observer.onNext(EVENT);
}
}
// 如果用调用了 中断
@Override
public void dispose() {
// 如果没有中断过,才有资格, 取消view.setOnClickListener(null);
if (isDisposable.compareAndSet(false, true)) {
// 主线程 很好的中断
if (Looper.myLooper() == Looper.getMainLooper()) {
view.setOnClickListener(null);
}else {// 主线程,通过Handler的切换
/*new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
view.setOnClickListener(null);
}
};*/
// HandlerScheduler.scheduleDirect
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
@Override
public void run() {
view.setOnClickListener(null);
}
});
}
}
}
@Override
public boolean isDisposed() {
return isDisposable.get();
}
}
}
--------------------------------------------------------------------------------------------------------------------------
public class RxActivityextends AppCompatActivity {
@SuppressLint("CheckResult")
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rx);
// 给这个控件做防抖
Button button = findViewById(R.id.button);
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer() {
@Override
public void accept(Object o)throws Exception {
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e)throws Exception {
e.onNext("Derry");
}
})
.subscribe(new Consumer() {
@Override
public void accept(String s)throws Exception {
Log.d(L.TAG, "accept: 终点:" + s);
}
});
}
});
}
}