Rxjava前一章节重点内容介绍的差不多,现在可以看下Rxbinding重点代码,其实Rxbinding就是将事件发送封装一下;来先看如何使用RxBinding:
implementation 'com.jakewharton.rxbinding3:rxbinding:3.1.0'
TextView tv = findViewById(R.id.tv);
RxView.clicks(tv).throttleFirst(2,TimeUnit.SECONDS)
.subscribe(new Consumer<Unit>() {
@Override
public void accept(Unit unit) throws Exception {
Log.d(TAG,"click TextView");
}
});
分析其源码:
@CheckResult
fun View.clicks(): Observable<Unit> {
return ViewClickObservable(this)
}
private class ViewClickObservable(
private val view: View
) : Observable<Unit>() {
override fun subscribeActual(observer: Observer<in Unit>) {
if (!checkMainThread(observer)) {
return
}
val listener = Listener(view, observer)
observer.onSubscribe(listener)
view.setOnClickListener(listener)
}
private class Listener(
private val view: View,
private val observer: Observer<in Unit>
) : MainThreadDisposable(), OnClickListener {
override fun onClick(v: View) {
if (!isDisposed) {
observer.onNext(Unit)
}
}
override fun onDispose() {
view.setOnClickListener(null)
}
}
}
到这里其实已经看出来这里只是将事件发送封装一下调用onSubscribe/onNext事件;
对比下ObservableCreate.java源码基本相同:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
那么这里可以自己去实现一个类似功能,将ObservableCreate#subscribeActual事件创建过程替换为RxView.clicks生成的Observable即可:
public class RxView {
public static Observable clicks(View view){
return new ViewClickObservable(view);
}
}
public class ViewClickObservable extends Observable<Object> {
private View view;
public ViewClickObservable(View view){
this.view = view;
}
@Override
protected void subscribeActual(Observer<? super Object> observer) {
if (Looper.myLooper()!=Looper.getMainLooper()){
return;
}
Listener listener = new Listener(view,observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
private class Listener extends AtomicReference<Disposable>
implements View.OnClickListener,Disposable {
private View view;
private Observer<Object> observer;
private Object object;
public Listener(View view, Observer<Object> observer) {
this.view = view;
this.observer = observer;
object = new Object();
}
@Override
public void onClick(View view){
if (!isDisposed()){
observer.onNext(object);
}
}
@Override
public void dispose() {
view.setOnClickListener(null);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
这里需要注意,可以不用有subcrible过程,因为click事件主要是想调用下游过程,即View#Click-->事件处理,而建立事件监听过程不关心,只要有一个onNext过程即可,这里就可以省去 source.subscribe(parent)过程;
来看代码:
@Override
protected void subscribeActual(Observer<? super Object> observer) {
if (Looper.myLooper()!=Looper.getMainLooper()){
return;
}
Listener listener = new Listener(view,observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
这个Listener就和ObservableCreate#CreateEmitter一样功能,
observer.onSubscribe(listener)方法主要是触发onSubscribe回调,然后可以在其中拿到Dispose去取消事件发送,当然如果不想取消事件发送,这个代码直接去掉都是可以的;
然后建立OnClick监听器,OnClick回调代码以及Dispose代码如下:
@Override
public void onClick(View view){
if (!isDisposed()){
observer.onNext(object);
}
}
@Override
public void dispose() {
view.setOnClickListener(null);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
然后调用如下代码自己就实现了RxBinding功能了:
//功能防抖 com.mi.learn.rxbinding.RxView.clicks(tv).throttleFirst(2,TimeUnit.SECONDS)
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,"click TextView");
}
});
类似还可以实现一个监听EditText的输入:
//联想搜索 com.mi.learn.rxbinding.RxView.addEditTextChanges(editText).debounce(1,TimeUnit.SECONDS).subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.d(TAG,"editextChanges:"+string);
}
});
public static Observable addEditTextChanges(EditText editText){
return new EditTextChangeObservable(editText);
}
public class EditTextChangeObservable extends Observable<String> {
private EditText editText;
public EditTextChangeObservable(EditText editText){
this.editText = editText;
}
@Override
protected void subscribeActual(Observer<? super String> observer) {
if (Looper.myLooper() != Looper.getMainLooper()) {
return;
}
EditTextChangeListener listener = new EditTextChangeListener(editText, observer);
observer.onSubscribe(listener);
editText.addTextChangedListener(listener);
}
private class EditTextChangeListener extends AtomicReference<Disposable>
implements TextWatcher,Disposable {
private EditText editText;
private Observer<? super String> observer;
public EditTextChangeListener(EditText editText, Observer<? super String> observer) {
this.editText = editText;
this.observer = observer;
}
@Override
public void dispose() {
editText.addTextChangedListener(null);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void beforeTextChanged(java.lang.CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(java.lang.CharSequence s, int start, int before, int count) {
if (!isDisposed()){
observer.onNext(s.toString());
}
}
@Override
public void afterTextChanged(Editable s) {
}
}
}