RxJava详解之线程调度原理(六)

RxJava详解之线程调度原理(六)

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onCompleted();
        Log.i("@@@", "call" + Thread.currentThread().getName());
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.i("@@@", "onNext : " + Thread.currentThread().getName());
            }
        });

执行结果:

07-26 17:16:49.284 7266-7309/? I/@@@: callRxIoScheduler-2
07-26 17:16:49.368 7266-7266/? I/@@@: onNext : main

subscribeOn()是指定被观察者事件源的执行线程。
observeOn()是指定观察者的处理时间的线程。

subscribeOn源码分析:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

/**
 * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
 * class in {@link Schedulers}.
 * 线程调度器
 */
public abstract class Scheduler {
    ...
}

它内部也是通过create()创建一个Observable但是唯一不同的是OnSubscribe传递的是OperatorSubscribeOn对象:

/**
 * Subscribes Observers on the specified {@code Scheduler}.
 */
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

它内部原理其实和map是一样一样的。我们就从call()方法说起,首先看一下SubscriberWorker类:

/**
 * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
 * class in {@link Schedulers}.
 */
public abstract class Scheduler {
    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * <p>
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
     * <p>
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *
     * @return a Worker representing a serial queue of actions to be executed
     */
    public abstract Worker createWorker();

    /**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
     */
    public abstract static class Worker implements Subscription {

        /**
         * Schedules an Action for execution.
         */
        public abstract Subscription schedule(Action0 action);

        /**
         * Schedules an Action for execution at some point in the future.
         */
        public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
    }
}    

而上面的schedulers是通过Schedulers.io()创建的,这里看一下它的源码:

public final class Schedulers {

    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;

    private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

    private static Schedulers getInstance() {
        for (;;) {
            Schedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new Schedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            } else {
                current.shutdownInstance();
            }
        }
    }

    public static Scheduler io() {
        return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
    }

    private Schedulers() {
        @SuppressWarnings("deprecation")
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

        Scheduler c = hook.getComputationScheduler();
        if (c != null) {
            computationScheduler = c;
        } else {
            computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
        }

        // 下面几时创建ioScheduler的地方
        Scheduler io = hook.getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = RxJavaSchedulersHook.createIoScheduler();
        }

        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
    ...
}    

继续onIOScheduler的源码:

/**
 * 和create()方法类似,这里简单的理解为直接返回参数即可
 * Hook to call when the Schedulers.io() is called.
 * @param scheduler the default io scheduler
 * @return the default of alternative scheduler
 */
public static Scheduler onIOScheduler(Scheduler scheduler) {
    Func1<Scheduler, Scheduler> f = onIOScheduler;
    if (f != null) {
        return f.call(scheduler);
    }
    return scheduler;
}

而上面getInstance().ioScheduler的地方最终会调用到RxJavaSchedulersHook.createIoScheduler():

@Experimental
public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }
    // 看到了吗? 这里最终的Scheduler是CachedThreadScheduler
    return new CachedThreadScheduler(threadFactory);
}

而在CachedThreadScheduler类中:

public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ...
}    

所以我们通过Schedulers.io()的源码可以发现这里的具体实现类是CachedThreadScheduler。而对应的Worker的实现类是EventLoopWorker:

   static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
        private final CompositeSubscription innerSubscription = new CompositeSubscription();
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        final AtomicBoolean once;

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.once = new AtomicBoolean();
            this.threadWorker = pool.get();
        }

        @Override
        public void unsubscribe() {
            if (once.compareAndSet(false, true)) {
                // unsubscribe should be idempotent, so only do this once

                // Release the worker _after_ the previous action (if any) has completed
                threadWorker.schedule(this);
            }
            innerSubscription.unsubscribe();
        }

        @Override
        public void call() {
            pool.release(threadWorker);
        }

        @Override
        public boolean isUnsubscribed() {
            return innerSubscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action, 0, null);
        }

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    // 内部会调用外面传递过来的Action对象的call方法
                    action.call();
                }
            }, delayTime, unit);
            innerSubscription.add(s);
            s.addParent(innerSubscription);
            return s;
        }

这里会继续执行到threadWorker.scheduleActual:

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;

    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }
...
}

ScheduledAction的源码:

/**
 * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
 * {@code Subscriber} in respect of an {@code Observer}.
 */
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
    /** */
    private static final long serialVersionUID = -3962399486978279857L;
    final SubscriptionList cancel;
    final Action0 action;

    public ScheduledAction(Action0 action) {
        this.action = action;
        this.cancel = new SubscriptionList();
    }
    ...
}    

ScheduledAction实现了Runnable接口,通过线程池executor最终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

observeOn源码分析

直接上源码:

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
    return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

observeOn内部是通过lift来实现的,看一下lift的源码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

归根到底还是使用了create(),那就和之前分析的基本都差不多了,直接看OnSubscribeLift:

/**
 * Transforms the downstream Subscriber into a Subscriber via an operator
 * callback and calls the parent OnSubscribe.call() method with it.
 */
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

也实现了OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)
所以这里要看一下OperatorObserveOn类及它的call()方法:

public final class OperatorObserveOn<T> implements Operator<T, T> {
    private final Scheduler scheduler;
    private final boolean delayError;
    private final int bufferSize;

    public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
        this(scheduler, delayError, RxRingBuffer.SIZE);
    }

    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

这里会走到创建ObserveOnSubscriber然后调用其init()方法:

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> child;
    final Scheduler.Worker recursiveScheduler;
    final boolean delayError;
    final Queue<Object> queue;
    /** The emission threshold that should trigger a replenishing request. */
    final int limit;

    // the status of the current stream
    volatile boolean finished;

    final AtomicLong requested = new AtomicLong();

    final AtomicLong counter = new AtomicLong();

    /**
     * The single exception if not null, should be written before setting finished (release) and read after
     * reading finished (acquire).
     */
    Throwable error;

    /** Remembers how many elements have been emitted before the requests run out. */
    long emitted;

    // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
    // not prevent anything downstream from consuming, which will happen if the Subscription is chained
    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        // 通过参数传递过来的Schedule创建对应的worker
        this.recursiveScheduler = scheduler.createWorker();
        this.delayError = delayError;
        int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
        // this formula calculates the 75% of the bufferSize, rounded up to the next integer
        this.limit = calculatedSize - (calculatedSize >> 2);
        if (UnsafeAccess.isUnsafeAvailable()) {
            queue = new SpscArrayQueue<Object>(calculatedSize);
        } else {
            queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
        }
        // signal that this is an async operator capable of receiving this many
        request(calculatedSize);
    }

    void init() {
        // don't want this code in the constructor because `this` can escape through the
        // setProducer call
        Subscriber<? super T> localChild = child;

        localChild.setProducer(new Producer() {

            @Override
            public void request(long n) {
                if (n > 0L) {
                    BackpressureUtils.getAndAddRequest(requested, n);
                    schedule();
                }
            }

        });
        localChild.add(recursiveScheduler);
        localChild.add(this);
    }

    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(NotificationLite.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        // 有该方法
        schedule();
    }

    @Override
    public void onCompleted() {
        if (isUnsubscribed() || finished) {
            return;
        }
        finished = true;
        // 有该方法
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        if (isUnsubscribed() || finished) {
            RxJavaHooks.onError(e);
            return;
        }
        error = e;
        finished = true;
        // 有该方法
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的
            recursiveScheduler.schedule(this);
        }
    }

    // only execute this from schedule()
    // 最终会执行到该方法
    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        // these are accessed in a tight loop around atomics so
        // loading them into local variables avoids the mandatory re-reading
        // of the constant fields
        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;

        // requested and counter are not included to avoid JIT issues with register spilling
        // and their access is is amortized because they are part of the outer loop which runs
        // less frequently (usually after each bufferSize elements)

        for (;;) {
            long requestAmount = requested.get();

            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;

                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }

                if (empty) {
                    break;
                }
                // 该方法会调用onNext方法
                localChild.onNext(NotificationLite.<T>getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }

            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
}

我们看到上面的onNextonComplete()onError方法里面都调用了schedule()方法,所以这个方法就是具体的切换线程的部分:

protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的,该方法将所有的事件都切换到了recursiveScheduler对应的线程
        // 这里将this传递进去了,所以最后会执行到当前的call()方法
        recursiveScheduler.schedule(this);
    }
}

里面会调用recursiveScheduler,而该recursiveScheduler是通过构造函数初始化的:

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
    this.child = child;
    // 通过参数传递过来的Schedule创建对应的worker
    this.recursiveScheduler = scheduler.createWorker();
}

而在此处我们的示例代码中用到的SchedulerAndroidSchedulers.mainThread():

public final class AndroidSchedulers {
    private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();

    private final Scheduler mainThreadScheduler;

    private static AndroidSchedulers getInstance() {
        for (;;) {
            AndroidSchedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new AndroidSchedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            }
        }
    }

    private AndroidSchedulers() {
        RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

        Scheduler main = hook.getMainThreadScheduler();
        if (main != null) {
            mainThreadScheduler = main;
        } else {
            // 具体的实现类是LooperScheduler
            mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
        }
    }

    /** A {@link Scheduler} which executes actions on the Android UI thread. */
    public static Scheduler mainThread() {
        return getInstance().mainThreadScheduler;
    }
    ...
}    

我们看到这里的具体的Scheduler的实现类是LooperScheduler:

class LooperScheduler extends Scheduler {
    private final Handler handler;

    LooperScheduler(Looper looper) {
        handler = new Handler(looper);
    }

    LooperScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    static class HandlerWorker extends Worker {
        private final Handler handler;
        private final RxAndroidSchedulersHook hook;
        private volatile boolean unsubscribed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
            this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
        }

        @Override
        public void unsubscribe() {
            unsubscribed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isUnsubscribed() {
            return unsubscribed;
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (unsubscribed) {
                return Subscriptions.unsubscribed();
            }

            action = hook.onSchedule(action);

            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.

            handler.sendMessageDelayed(message, unit.toMillis(delayTime));

            if (unsubscribed) {
                handler.removeCallbacks(scheduledAction);
                return Subscriptions.unsubscribed();
            }

            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
    }

    static final class ScheduledAction implements Runnable, Subscription {
        private final Action0 action;
        private final Handler handler;
        private volatile boolean unsubscribed;

        ScheduledAction(Action0 action, Handler handler) {
            this.action = action;
            this.handler = handler;
        }

        @Override public void run() {
            try {
                action.call();
            } catch (Throwable e) {
                // nothing to do but print a System error as this is fatal and there is nowhere else to throw this
                IllegalStateException ie;
                if (e instanceof OnErrorNotImplementedException) {
                    ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
                } else {
                    ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
                }
                RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override public void unsubscribe() {
            unsubscribed = true;
            handler.removeCallbacks(this);
        }

        @Override public boolean isUnsubscribed() {
            return unsubscribed;
        }
    }
}

他里面对应的WorkerHandlerWorker,他里面也是通过ScheduledAction来实现的,他实现了Runnable接口。通过主线程的MainLooper创建一个Handler然后去执行ScheduledAction中的run()方法。然后在run()方法中调用了ObserveOnSubscriber.call(),这便是它实现线程切换的原理。

更多内容请看下一篇文章RxJava详解(七)


更多精彩文章请见:Github AndroidNote,欢迎Star

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,159评论 6 151
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,460评论 7 62
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 6,868评论 2 50
  • 在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github: https://...
    松江野人阅读 5,883评论 0 1
  • 一大早, 爸爸就把我叫了起来,说要去看牙。为什么要看牙呢?因为,几周前我和几个朋友正在玩儿的时候掉了一块儿小...
    Monica莫阅读 286评论 2 1