rxjava2 flowable/observable操作符汇总

Rxjava2 操作符汇总


构造

  • create/generate ?

  • just/empty/error/never/range(Long)

  • from(Array/Callable/Future/Iterable/Publisher)

  • startWith/repeat/cache/defaultIfEmpty 增加默认值 开头/重复/缓存/返回默认值

加时

  • defer/timer/interval(Range) 延迟执行
  • delay/timeout/timestamp 延时、超时、时间戳

异常

  • onError(ResumeNext/Return/ReturnItem) 错误处理
  • onExceptionResumeNext 放行error、throwable
  • retry(Until/When)

并发与并行

  • parallel、serialize 并行
  • observeOn、subscribeOn 指定线程

获取结果

  • blocking[First(T)/Last(T)/ForEach/Iterable(int)/Latest/MostRecent(T)/Single/Subscribe] 获取结果

  • blockingNext ?

  • forEach(While) 直接消费结果

  • subscribe 直接消费结果

限流

  • distinct(UntilChanged)去重

  • filter 按条件过滤

  • ignoreElements 忽略所有

  • limit 限制个数

  • (take/skip)(Last/Until/While)

  • elementAt(OrError)

  • (first/last/single)(Element/OrError)

  • debounce 过滤,防止抖动,如果指定时间间接内没有其他数据,则正常。

  • sample 抽样

  • throttle(First/Last/WithtimeOut) ?

  • window 窗口

  • onBackpressureBuffer/Drop/Latest 背压

合并源

  • amb(Array) with 多个流取最快的一个

  • concat(Array)(Eager/DelayError) with 多个流顺序合并

  • switchOnNext(DelayError) 不断切换到最近开始的流

  • merge(Array)(DelayError) with 多个流程乱序合并

  • combineLastest(DelayError) 多个流流速不同时,流速快的与流速慢的最新一个合并。 A1 B1 C2….

  • withLatestFrom

  • zip(Array/Iterable) with 多个流一一对应,合并处理,最先结束为截止点 A1 B2 C3 …

  • join ?

转换

  • buffer(count,skip)、(timespan,timeskip, unit)分批

  • map/cast 转换

  • flatMap/concatMap(Eager)(DelayError) 流转换, 乱序、顺序

  • switchMap(DelayError)?

  • all/any/contains 条件判断

  • isEmpty/sequenceEqual 两个流一一比较是否相同,可以先后顺序不同?

  • collect/reduce/scan 归并

  • groupBy 分组

  • sorted 排序

  • toList/Map/Multimap/SortedList 集合

日志

  • doOnSubscribe/Request/Next/Complete/Error/Cancel
  • doOnTerminate = complete/error
  • doFinally = terminate/cancel
  • doOnLifecycle = subscribe/request/cancel
  • doOnEach = all
  • doAfterNext/AfterTerminate

新增操作

  • compose 组合多个操作
  • lift 创建新的操作

实例

maven依赖如下:

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
        </dependency>
    </dependencies>

代码示例如下:

import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.StopWatch;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;

import static java.util.stream.Collectors.joining;

@Slf4j
public class RxjavaTest {
  
  
  static void sleep(long delayed, TimeUnit unit){
    try {
      unit.sleep(delayed);
    } catch (InterruptedException e) {
    }
  }
  
  String findById(Long id){
    log.info("called findById {}", id);
    sleep(200, TimeUnit.MILLISECONDS);
    return id + "-name";
  }
  
  String findById(String id){
    log.info("called findById {}", id);
    sleep(200, TimeUnit.MILLISECONDS);
    return id + "-name";
  }
  
  String findByIdWithError(Long id){
    String result = findById(id);
    if(id%3==0){
      throw new RuntimeException("id.not.exist");
    }
    return result;
  }
  
  static void trace(Runnable runnable){
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    try {
      runnable.run();
    }finally {
      stopWatch.stop();
      System.out.printf("trace = %s", stopWatch.prettyPrint());
    }
  }
  
  /**
   * 各种简单创建接口
   * @throws Exception
   */
  @Test
  public void test_create() throws Exception{
    //列举
    Observable.just(1,2).blockingSubscribe(System.out::println);
    //空
    Observable.empty().blockingSubscribe(System.out::println);
    //错误
    Observable.error(new RuntimeException("error")).onErrorReturnItem("default").blockingSubscribe(System.out::println);
    //范围
    Observable.rangeLong(1, 10).blockingSubscribe(System.out::println);
    //列表
    Observable.fromIterable(Arrays.asList(1,2)).blockingSubscribe(System.out::println);
    //数组
    Observable.fromArray(new Integer[]{1,2}).blockingSubscribe(System.out::println);
    //重复
    Observable.just(1,2).repeat(3).blockingSubscribe(System.out::println);
  }
  
  
  /**
   * 将分页接口包装成流
   *
   * @throws Exception
   */
  @Test
  public void test_page_observable() throws Exception{
    Observable<String> observable = Observable.<String>create(emitter -> {
      try {
        int pageNo = 1;
  
        while (pageNo < 5) {
          int current = pageNo;
          
          log.info("start read pageNo={}", pageNo);
          
          IntStream.range(0, 10).mapToObj(i -> "sku" + String.valueOf((current-1)*10000+i)).forEach(emitter::onNext);
          
          sleep(10, TimeUnit.MILLISECONDS);
          
          log.info("end read pageNo={}", pageNo);
          
          ++pageNo;
        }
        emitter.onComplete();
      } catch (Exception e) {
        log.error("{}", e);
      } finally {
        emitter.onComplete();
      }
    });
    
    observable
        .blockingSubscribe(System.out::println);
    
    
    TimeUnit.SECONDS.sleep(1);
    log.info("将读取放在单独的线程中执行:");
    observable
        .subscribeOn(Schedulers.io())
        .blockingSubscribe(System.out::println);
  
    
    TimeUnit.SECONDS.sleep(1);
    log.info("将读取放在单独的线程中, 并且分批执行:");
    observable
        .subscribeOn(Schedulers.io())
        .buffer(10) //分批处理
        .blockingSubscribe(System.out::println);
  }
  
  /**
   08:50:15.564 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
   08:50:15.854 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
   08:50:15.914 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
   08:50:15.914 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
   08:50:16.115 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
   08:50:16.116 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
   */
  @Test
  public void test_defer(){
    Observable<String> observableNotDeferred = Observable.just(findById(1L));
    log.info("subscribe observable");
    observableNotDeferred.blockingSubscribe();
  
    Observable<String> observableDeferred = Observable.defer(()-> Observable.just(findById(1L)));
    log.info("subscribe observable");
    observableDeferred.blockingSubscribe();
  
    Observable<String> observable = Observable.fromCallable(()-> findById(1L));
    log.info("subscribe observable");
    observable.blockingSubscribe();
  }
  
  
  /**
   * Timed[time=1530762785174, unit=MILLISECONDS, value=1]
   * Timed[time=1530762785175, unit=MILLISECONDS, value=2]
   * Timed[time=1530762785175, unit=MILLISECONDS, value=3]
   * Timed[time=1530762787179, unit=MILLISECONDS, value=5]
   * Timed[time=1530762788180, unit=MILLISECONDS, value=6]
   * Timed[time=1530762789179, unit=MILLISECONDS, value=7]
   * Timed[time=1530762790182, unit=MILLISECONDS, value=8]
   * Timed[time=1530762791180, unit=MILLISECONDS, value=9]
   */
  @Test
  public void test_delay(){
    Observable
        .just(1,2,3)
        .delay(2,TimeUnit.SECONDS) //延迟2s
        .timestamp() //加时间戳 Timed<T>
        .blockingSubscribe(System.out::println);
    Observable
        .intervalRange(5, 5, 2, 1, TimeUnit.SECONDS) //延迟2s,之后每个延迟1s
        .timestamp()
        .blockingSubscribe(System.out::println);
  }
  
  
  /**
   *
   * @throws Exception
   */
  @Test
  public void test_error_retry() throws Exception{
    
    Observable.fromCallable(()-> findByIdWithError(3L))
        .doOnError(e-> log.warn("", e))
        .retry(2, e-> e.getMessage().equals("id.not.exist"))
        .blockingSubscribe();
  }
  
  /**
   *
   * @throws Exception
   */
  @Test
  public void test_error_returnItem() throws Exception{
    
    Observable.fromCallable(()-> findByIdWithError(3L))
        .doOnError(e-> log.warn("", e))
        .onErrorReturnItem("default result")
        .blockingSubscribe(System.out::println);
  }
  
  /**
   *
   * @throws Exception
   */
  @Test
  public void test_error_resumeNext() throws Exception{
    
    Observable.fromCallable(()-> findByIdWithError(3L))
        .doOnError(e-> log.warn("", e))
        .onErrorResumeNext(Observable.just("default result"))
        .blockingSubscribe(System.out::println);
  }
  
  
  
  /**
   *
   * @throws Exception
   */
  @Test
  public void test_timeout_retry() throws Exception{
    Observable.fromCallable(()-> findById(1L))
        .timeout(100, TimeUnit.MILLISECONDS)
        .doOnError(e-> log.warn("", e))
        .retry(2, e-> e instanceof TimeoutException)
        .blockingSubscribe();
  }
  
  
  
  
  
  /**
   * start handle id= 1
   * start handle id= 2
   * start handle id= 3
   * start handle id= 4
   * start handle id= 5
   * result for :5
   * result for :1
   * result for :2
   * result for :3
   * result for :4
   * trace = StopWatch '': running time (millis) = 591
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00591  100%
   * @throws Exception
   */
  @Test
  public void test_flatMap_without_Errors() throws Exception{
  
    trace(()->{
      Observable.rangeLong(1,5)
          .doOnNext(id-> System.out.printf("start handle id= %s\n", id))
          .flatMap(id-> Observable.fromCallable(()->{
            return findById(id);
          }).subscribeOn(Schedulers.io()), 5)
          .doOnNext(result-> System.out.printf("%s\n", result))
          .blockingSubscribe();
    });
  }
  
  
  /**
   *
   * start handle id= 1
   * start handle id= 2
   * start handle id= 3
   * start handle id= 4
   * start handle id= 5
   * result for :2
   * result for :4
   * result for :1
   * result for :5
   * trace = StopWatch '': running time (millis) = 457
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00457  100%
   *
   * java.lang.RuntimeException: id.not.exist
   *
   * @throws Exception
   */
  @Test
  public void test_flatMap_throw_Errors_direct() throws Exception{
    
    trace(()->{
      Observable.rangeLong(1,5)
          .doOnNext(id-> System.out.printf("start handle id= %s\n", id))
          .flatMap(id-> Observable.fromCallable(()->{
            return findByIdWithError(id);
          }).subscribeOn(Schedulers.io()), 5)
          .doOnNext(result-> System.out.printf("%s\n", result))
          .blockingSubscribe();
    });
  }
  
  
  /**
   *start handle id= 1
   * start handle id= 2
   * start handle id= 3
   * start handle id= 4
   * start handle id= 5
   * result for :2
   * result for :1
   * result for :4
   * result for :5
   * trace = StopWatch '': running time (millis) = 450
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00450  100%
   *
   * java.lang.RuntimeException: id.not.exist
   */
  @Test
  public void test_flatMap_delayErrors(){
    trace(()->{
      Observable.rangeLong(1,5)
          .doOnNext(id-> System.out.printf("start handle id= %s\n", id))
          .flatMap(id-> Observable.fromCallable(()->{
            return findByIdWithError(id);
          }).subscribeOn(Schedulers.io()), true, 5)
          .doOnNext(result-> System.out.printf("%s\n", result))
          .blockingSubscribe();
    });
  }
  
  
  /**
   * start handle id= 1
   * start handle id= 2
   * start handle id= 3
   * start handle id= 4
   * start handle id= 5
   * result for :1
   * result for :2
   * trace = StopWatch '': running time (millis) = 350
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00350  100%
   *
   * java.lang.RuntimeException: id.not.exist
   * @throws Exception
   */
  @Test
  public void test_concatMap_direct_throw_Errors() throws Exception{
    
    trace(()->{
      Observable.rangeLong(1,5)
          .doOnNext(id-> System.out.printf("start handle id= %s\n", id))
          .concatMapEager(id-> Observable.<String>fromCallable(()->{
            return findByIdWithError(id);
          }).subscribeOn(Schedulers.io()))
          .doOnNext(result-> System.out.printf("%s\n", result))
          .blockingSubscribe();
    });
  }
  
  
  /**
   * start handle id= 1
   * start handle id= 2
   * start handle id= 3
   * start handle id= 4
   * start handle id= 5
   * result for :1
   * result for :2
   * result for :4
   * result for :5
   * trace = StopWatch '': running time (millis) = 410
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00410  100%
   *
   * java.lang.RuntimeException: id.not.exist
   * @throws Exception
   */
  @Test
  public void test_concatMap_delayErrors() throws Exception{
    
    trace(()->{
      Observable.rangeLong(1,5)
          .doOnNext(id-> System.out.printf("start handle id= %s\n", id))
          .concatMapEagerDelayError(id-> Observable.fromCallable(()->{
            return findByIdWithError(id);
          }).subscribeOn(Schedulers.io()), true)
          .doOnNext(result-> System.out.printf("%s\n", result))
          .blockingSubscribe();
    });
  }
  
  
  /**
   21
   22
   23
   trace = StopWatch '': running time (millis) = 120
   -----------------------------------------
   ms     %     Task name
   -----------------------------------------
   00120  100%
   * @throws Exception
   */
  @Test
  public void test_amb() throws Exception{
  
    trace(()->{
      Observable.ambArray(
          Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS),
          Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS)
      ).blockingSubscribe(System.out::println);
    });
    
  }
  
  
  /**
   * 21
   * 22
   * 23
   * 11
   * 1
   * 12
   * 2
   * 13
   * 3
   * trace = StopWatch '': running time (millis) = 389
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00389  100%
   * @throws Exception
   */
  @Test
  public void test_merge() throws Exception{
    
    trace(()->{
      Observable.merge(
          Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS),
          Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS)
      ).blockingSubscribe(System.out::println);
    });
  }
  
  
  /**
   * 1
   * 2
   * 3
   * 11
   * 12
   * 13
   * 21
   * 22
   * 23
   * trace = StopWatch '': running time (millis) = 670
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00670  100%
   * @throws Exception
   */
  @Test
  public void test_concat() throws Exception{
    trace(()->{
      Observable.concat(
          Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
      ).blockingSubscribe(System.out::println);
    });
  }
  
  
  /**
   * 1
   * 2
   * 3
   * 11
   * 12
   * 13
   * 21
   * 22
   * 23
   * trace = StopWatch '': running time (millis) = 407
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00407  100%
   * @throws Exception
   */
  @Test
  public void test_concatEager() throws Exception{
    trace(()->{
      Observable.concatEager(Arrays.asList(
          Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.range(21, 3).delay(300,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
          3, 3
      ).blockingSubscribe(System.out::println);
    });
  }
  
  
  /**
   * 1,11,21
   * 2,12,22
   * 3,13,23
   * trace = StopWatch '': running time (millis) = 337
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00337  100%
   * @throws Exception
   */
  @Test
  public void test_zip() throws Exception{
    trace(()->{
      Observable.zipIterable(Arrays.asList(
          Observable.intervalRange(1, 4, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.range(21, 4).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
          a-> Arrays.stream(a).map(String::valueOf).collect(joining(",")),true,10
      ).blockingSubscribe(System.out::println);
    });
  }
  
  
  /**
   * 1,11,21
   * 1,11,22
   * 1,11,23
   * 1,11,24
   * 1,12,24
   * 2,12,24
   * 2,13,24
   * trace = StopWatch '': running time (millis) = 361
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 00361  100%
   * @throws Exception
   */
  @Test
  public void test_combineLatest() throws Exception{
    trace(()->{
      Observable.combineLatest(Arrays.asList(
          Observable.intervalRange(1, 2, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
          Observable.range(21, 4).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
          a-> Arrays.stream(a).map(String::valueOf).collect(joining(",")),10
      ).blockingSubscribe(System.out::println);
    });
  }
  
  
  /**
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 23256  054%  without-buffer
   * 19987  046%  with-buffer
   *
   * @throws Exception
   */
  @Test
  public void test_without_backPressure() throws Exception{
    
    StopWatch stopWatch = new StopWatch();
 
    Observable<String> observable = Observable.<String>create(emitter -> {
      try {
        
        int pageNo = 1;
        
        while (pageNo <= 5) {
          
          final int tmp = pageNo;
          
          log.info("start read pageNo={}", pageNo);
          
          IntStream.range(0,10).mapToObj(i-> String.valueOf((tmp-1)*10000+i)).forEach(sku->{
  
            sleep(10, TimeUnit.MILLISECONDS);
        
            emitter.onNext(sku);
          });
          
          log.info("end read pageNo={}", pageNo);
          
          ++ pageNo;
        }
        
        emitter.onComplete();
      } catch (Exception e) {
        log.error("{}", e);
      } finally {
        emitter.onComplete();
      }
    }).subscribeOn(Schedulers.io());
  
    
    stopWatch.start("without-buffer");
    
    observable
    .doOnNext(list-> log.info("process {}", list))
    .flatMap(a-> Observable
        .fromCallable(()->findById(a))
        .subscribeOn(Schedulers.io()),
        false, 3)
    .blockingSubscribe();
    
    stopWatch.stop();
  
    
    stopWatch.start("with-buffer");
    observable
        .buffer(3)
        .flatMapIterable(list-> Observable.fromIterable(list)
            .flatMap(a-> Observable
                .fromCallable(()-> findById(a))
                .subscribeOn(Schedulers.io()),
                false, list.size())
            .toList()
            .blockingGet())
        .blockingSubscribe();
    stopWatch.stop();
    
    log.info("{}", stopWatch.prettyPrint());
  
  }
  
  
  /**
   * -----------------------------------------
   * ms     %     Task name
   * -----------------------------------------
   * 04152  055%  manual subscribe
   * 03362  045%  with buffer
   * @throws Exception
   */
  @Test
  public void test_flowable_backpressure() throws Exception{
    
    Flowable<String> flowable = Flowable.<String>create(emitter -> {
      try {
        
        int pageNo = 1;
        
        while (pageNo < 5) {
          
          final int tmp = pageNo;
          
          log.info("start pageNo={}", pageNo);
          
          Flowable.range(1,10).map(i-> String.valueOf(tmp*10000+i)).blockingSubscribe(sku-> {
            while(emitter.requested()==0){
              if(emitter.isCancelled()){
                break;
              }
            }
            sleep(10, TimeUnit.MILLISECONDS);
            emitter.onNext(sku);
          });
          
          log.info("end pageNo={}", pageNo);
          
          ++ pageNo;
        }
        
        emitter.onComplete();
      } catch (Exception e) {
        log.error("{}", e);
      } finally {
        emitter.onComplete();
      }
    }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
  
  
    StopWatch stopWatch = new StopWatch();
    
    stopWatch.start("manual subscribe");
    Subscriber<String> subscriber = new Subscriber<String>() {
      
      Subscription mSubscription;
      
      @Override
      public void onSubscribe(Subscription s) {
        log.info("onSubscribe");
        mSubscription = s;
        s.request(1);
      }
      
      @Override
      public void onNext(String sku) {
        log.info("onNext: {}", sku);
        mSubscription.request(1);
        sleep(100, TimeUnit.MILLISECONDS);
      }
      
      @Override
      public void onError(Throwable t) {
        log.info("onError: ", t);
        mSubscription.cancel();
      }
      
      @Override
      public void onComplete() {
        log.info("onComplete");
        mSubscription.cancel();
      }
    };
    
    flowable.blockingSubscribe(subscriber);
    stopWatch.stop();
    
    stopWatch.start("with buffer");
    flowable
        .buffer(3)
        .doOnNext(list-> log.info("process batch {}", list))
        .map(list-> Flowable.fromIterable(list)
            .flatMap(a-> Flowable
                .fromCallable(()->findById(a))
                .subscribeOn(Schedulers.io()),
                false,
                list.size())
            .toList()
            .blockingGet())
        .flatMap(Flowable::fromIterable)
        .blockingSubscribe();
    stopWatch.stop();
    
    log.info("{}", stopWatch.prettyPrint());
  
  }
  
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350