Elasticsearch源码分析-HotThreads分析

0.前言

当elasticsearch机器CPU过高时,集群会出现拒绝响应EsRejectedExecutionException异常,甚至引起集群down掉。

[2018-11-11 08:38:34,867][DEBUG][action.search.type] [127.0.0.1] [12439276468] Failed to execute fetch phase
org.elasticsearch.transport.RemoteTransportException: [127.0.0.1][inet[/127.0.0.1:9300]][indices:data/read/search[phase/fetch/id]]
Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler@7cf77b77
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
        at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:224)
        at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

遇到这种情况,我们可以使用elasticsearch的hot_threads api来显示当前耗时较高的线程栈,请求的示例:

curl -XGET 127.0.0.1:9200/_cluster/nodes/hotthreads

返回的结果示例:

::: [127.0.0.1][pGtNLemNQNq2P1TYQPUqag][localhost][inet[/127.0.0.1:9300]]{master=true}
   Hot threads at 2018-12-28T12:12:48.046Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
    1.2% (6.2ms out of 500ms) cpu usage by thread 'elasticsearch[127.0.0.1][search][T#18]'
     9/10 snapshots sharing following 2 elements
       java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       java.lang.Thread.run(Thread.java:745)

1.请求映射

在elasticsearch启动时,会添加Rest模块,guice注入框架会执行模块的configure()方法进行配置

public class RestModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(RestController.class).asEagerSingleton();
        new RestActionModule(restPluginsActions).configure(binder());
    }
}

在elasticsearch的Rest模块中,会绑定RestNodesHotThreadsAction

public class RestActionModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(RestNodesHotThreadsAction.class).asEagerSingleton();
    }
}

在RestNodesHotThreadsAction的构造方法中,会将url注入到当前对象中,其中nodeId参数为节点id,类似于响应中的pGtNLemNQNq2P1TYQPUqag。如果有url中包含nodeId参数,则只分析指定node的线程栈。

public class RestNodesHotThreadsAction extends BaseRestHandler {
    @Inject
    public RestNodesHotThreadsAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hotthreads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hot_threads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hotthreads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hot_threads", this);

        controller.registerHandler(RestRequest.Method.GET, "/_nodes/hotthreads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_nodes/hot_threads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hotthreads", this);
        controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hot_threads", this);
    }
}

RestNodesHotThreadsAction重写了父类BaseRestHandler的handleRequest()方法,用来处理与url匹配的请求。
RestRequest支持如下参数:
nodeId: 要分析的节点id线程栈,如果为空,则分析整个集群所有节点的线程栈
threads: 需要分析的线程数,默认3
ignore_idle_threads: 是否忽略空闲线程,默认为true
type:需要检查的线程状态的类型,默认是cpu,支持block、wait
interval:前后两次检查的时间间隔,默认500ms
snapshots:需要生产堆栈跟踪快照的数量
最终调用AbstractClusterAdminClient的nodesHotThreads()方法分析热点线程

public class RestNodesHotThreadsAction extends BaseRestHandler {
    @Override
    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
        String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
        NodesHotThreadsRequest nodesHotThreadsRequest = new NodesHotThreadsRequest(nodesIds);
        nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads()));
        nodesHotThreadsRequest.ignoreIdleThreads(request.paramAsBoolean("ignore_idle_threads", nodesHotThreadsRequest.ignoreIdleThreads()));
        nodesHotThreadsRequest.type(request.param("type", nodesHotThreadsRequest.type()));
        nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval()));
        nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots()));
        client.admin().cluster().nodesHotThreads(nodesHotThreadsRequest, new RestResponseListener<NodesHotThreadsResponse>(channel) {
            @Override
            public RestResponse buildResponse(NodesHotThreadsResponse response) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (NodeHotThreads node : response) {
                    sb.append("::: ").append(node.getNode().toString()).append("\n");
                    Strings.spaceify(3, node.getHotThreads(), sb);
                    sb.append('\n');
                }
                return new BytesRestResponse(RestStatus.OK, sb.toString());
            }
        });
    }
}

2、Action子类及实现

nodesHotThreads()方法中指定当前操作的action为NodesHotThreadsAction.INSTANCE,elasticsearch会将此action绑定到TransportNodesHotThreadsAction类上

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
    }
}

因此在执行TransportAction.execute()方法时,会执行TransportNodesHotThreadsAction父类TransportNodesOperationAction的doExecute()方法

public abstract class TransportNodesOperationAction<Request extends NodesOperationRequest, Response extends NodesOperationResponse, NodeRequest extends NodeOperationRequest, NodeResponse extends NodeOperationResponse> extends TransportAction<Request, Response> {
    @Override
    protected void doExecute(Request request, ActionListener<Response> listener) {
        new AsyncAction(request, listener).start();
    }
    
    private class AsyncAction {
        private AsyncAction(Request request, ActionListener<Response> listener) {
            this.request = request;
            this.listener = listener;
            clusterState = clusterService.state();
            String[] nodesIds = resolveNodes(request, clusterState);
            this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
            this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
        }

        private void start() {
            if (nodesIds.length == 0) { // 集群中的节点
                // nothing to notify
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        listener.onResponse(newResponse(request, responses));
                    }
                });
                return;
            }
            TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
            if (request.timeout() != null) {
                transportRequestOptions.withTimeout(request.timeout());
            }
            // 不启用压缩
            transportRequestOptions.withCompress(transportCompress());
            for (int i = 0; i < nodesIds.length; i++) { // 遍历集群中的节点
                final String nodeId = nodesIds[i];
                final int idx = i;
                final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
                try {
                    // 如果是local节点
                    if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
                        threadPool.executor(executor()).execute(new Runnable() {
                            @Override
                            public void run() {
                                try { // nodeOperation() -> TransportNodesStatsAction.nodeOperation()
                                    onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
                                } catch (Throwable e) {
                                    onFailure(idx, clusterState.nodes().localNodeId(), e);
                                }
                            }
                        });
                    } else if (nodeId.equals("_master")) {
                        // 如果是master节点
                        threadPool.executor(executor()).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
                                } catch (Throwable e) {
                                    onFailure(idx, clusterState.nodes().masterNodeId(), e);
                                }
                            }
                        });
                    } else {
                        // 节点为null
                        if (node == null) {
                            onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
                        } else if (!clusterService.localNode().shouldConnectTo(node)) {
                            // 连不上local节点
                            onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
                        } else {
                            NodeRequest nodeRequest = newNodeRequest(nodeId, request);
                            // 向节点发送nodeRequest请求
                            transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
                                @Override
                                public void handleResponse(NodeResponse response) {
                                    onOperation(idx, response);
                                }
                            });
                        }
                    }
                } catch (Throwable t) {
                    onFailure(idx, nodeId, t);
                }
            }
        }
    }
}

resolveNodes()方法用来解析传入的节点id,并返回节点集合,可以包含如下节点:
(1)节点id为空(null或者"")或者为"_all",则为集群中的所有节点
(2)节点id为"_local",则为当前节点
(3)节点id为"_master",则为master节点
(4)节点id为集群中已经存在的节点id,则为nodeId
(5)节点id和集群中节点名称能用通配符*匹配到,则为匹配到节点的nodeId
(6)节点id匹配到集群中节点的ip或者主机名,则为匹配到节点的nodeId
(7)如果节点id中包含":",如果为"data:true"为添加集群中的数据节点,"data:false"为删除集群中的数据节点;如果为"master:true"为添加集群中的master节点,"master:false"为删除集群中的master节点;否则用key和value匹配集群中节点的属性信息,添加匹配到的nodeId
接着会遍历上一步中得到的所有节点,如果目的节点是"_local"或者"master",则直接执行nodeOperation()方法,否则使用TransportService将请求发送到目的节点中执行

3. Action操作细节

TransportNodesOperationAction类中的nodeOperation()是个抽象方法,因此会执行子类TransportNodesHotThreadsAction的nodeOperation()方法。
主要是构造HotThreads对象,然后调用其detect()方法。

public class TransportNodesHotThreadsAction extends TransportNodesOperationAction<NodesHotThreadsRequest, NodesHotThreadsResponse, TransportNodesHotThreadsAction.NodeRequest, NodeHotThreads> {
    @Override
    protected NodeHotThreads nodeOperation(NodeRequest request) throws ElasticsearchException {
        HotThreads hotThreads = new HotThreads()
                .busiestThreads(request.request.threads)
                .type(request.request.type)
                .interval(request.request.interval)
                .threadElementsSnapshotCount(request.request.snapshots)
                .ignoreIdleThreads(request.request.ignoreIdleThreads);
        try {
            return new NodeHotThreads(clusterService.localNode(), hotThreads.detect());
        } catch (Exception e) {
            throw new ElasticsearchException("failed to detect hot threads", e);
        }
    }
}

探测耗时线程的具体逻辑主要在innerDetect()方法中,主要逻辑如下:
(1)首先探测一下线程,然后隔interval时间(默认500ms)再次探测一下线程
(2)根据线程类型(cpu、wait和block)对线程集合按cpu时间进行排序
(3)根据线程类型计算cpu时间,并以interval为基础计算时间占比

public class HotThreads {
    public String detect() throws Exception {
        synchronized (mutex) {
            return innerDetect();
        }
    }
    
    private String innerDetect() throws Exception {
        StringBuilder sb = new StringBuilder();

        sb.append("Hot threads at ");
        sb.append(DATE_TIME_FORMATTER.printer().print(System.currentTimeMillis()));
        sb.append(", interval=");
        sb.append(interval);
        sb.append(", busiestThreads=");
        sb.append(busiestThreads);
        sb.append(", ignoreIdleThreads=");
        sb.append(ignoreIdleThreads);
        sb.append(":\n");

        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        boolean enabledCpu = false;
        try {
            if (threadBean.isThreadCpuTimeSupported()) {
                if (!threadBean.isThreadCpuTimeEnabled()) {
                    enabledCpu = true;
                    threadBean.setThreadCpuTimeEnabled(true);
                }
            } else {
                throw new IllegalStateException("MBean doesn't support thread CPU Time");
            }
            Map<Long, MyThreadInfo> threadInfos = new HashMap<>();
            for (long threadId : threadBean.getAllThreadIds()) {
                // ignore our own thread...
                if (Thread.currentThread().getId() == threadId) {
                    continue;
                }
                long cpu = threadBean.getThreadCpuTime(threadId);
                if (cpu == -1) {
                    continue;
                }
                ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
                if (info == null) {
                    continue;
                }
                threadInfos.put(threadId, new MyThreadInfo(cpu, info));
            }
            Thread.sleep(interval.millis());
            for (long threadId : threadBean.getAllThreadIds()) {
                // ignore our own thread...
                if (Thread.currentThread().getId() == threadId) {
                    continue;
                }
                long cpu = threadBean.getThreadCpuTime(threadId);
                if (cpu == -1) {
                    threadInfos.remove(threadId);
                    continue;
                }
                ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
                if (info == null) {
                    threadInfos.remove(threadId);
                    continue;
                }
                MyThreadInfo data = threadInfos.get(threadId);
                if (data != null) {
                    data.setDelta(cpu, info);
                } else {
                    threadInfos.remove(threadId);
                }
            }
            // sort by delta CPU time on thread.
            List<MyThreadInfo> hotties = new ArrayList<>(threadInfos.values());
            final int busiestThreads = Math.min(this.busiestThreads, hotties.size());
            // skip that for now
            CollectionUtil.introSort(hotties, new Comparator<MyThreadInfo>() {
                public int compare(MyThreadInfo o1, MyThreadInfo o2) {
                    if ("cpu".equals(type)) {
                        return (int) (o2.cpuTime - o1.cpuTime);
                    } else if ("wait".equals(type)) {
                        return (int) (o2.waitedTime - o1.waitedTime);
                    } else if ("block".equals(type)) {
                        return (int) (o2.blockedTime - o1.blockedTime);
                    }
                    throw new IllegalArgumentException();
                }
            });
            // analyse N stack traces for M busiest threads
            long[] ids = new long[busiestThreads];
            for (int i = 0; i < busiestThreads; i++) {
                MyThreadInfo info = hotties.get(i);
                ids[i] = info.info.getThreadId();
            }
            ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
            for (int j = 0; j < threadElementsSnapshotCount; j++) {
                // NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist,
                // null will be set in the corresponding element in the returned array. A thread is alive if it has
                // been started and has not yet died.
                allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
                Thread.sleep(threadElementsSnapshotDelay.millis());
            }
            for (int t = 0; t < busiestThreads; t++) {
                long time = 0;
                if ("cpu".equals(type)) {
                    time = hotties.get(t).cpuTime;
                } else if ("wait".equals(type)) {
                    time = hotties.get(t).waitedTime;
                } else if ("block".equals(type)) {
                    time = hotties.get(t).blockedTime;
                }
                String threadName = null;
                for (ThreadInfo[] info : allInfos) {
                    if (info != null && info[t] != null) {
                        if (ignoreIdleThreads && isIdleThread(info[t])) {
                            info[t] = null;
                            continue;
                        }
                        threadName = info[t].getThreadName();
                        break;
                    }
                }
                if (threadName == null) {
                    continue; // thread is not alive yet or died before the first snapshot - ignore it!
                }
                double percent = (((double) time) / interval.nanos()) * 100;
                sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, threadName));
                // for each snapshot (2nd array index) find later snapshot for same thread with max number of
                // identical StackTraceElements (starting from end of each)
                boolean[] done = new boolean[threadElementsSnapshotCount];
                for (int i = 0; i < threadElementsSnapshotCount; i++) {
                    if (done[i]) continue;
                    int maxSim = 1;
                    boolean[] similars = new boolean[threadElementsSnapshotCount];
                    for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
                        if (done[j]) continue;
                        int similarity = similarity(allInfos[i][t], allInfos[j][t]);
                        if (similarity > maxSim) {
                            maxSim = similarity;
                            similars = new boolean[threadElementsSnapshotCount];
                        }
                        if (similarity == maxSim) similars[j] = true;
                    }
                    // print out trace maxSim levels of i, and mark similar ones as done
                    int count = 1;
                    for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
                        if (similars[j]) {
                            done[j] = true;
                            count++;
                        }
                    }
                    if (allInfos[i][t] != null) {
                        final StackTraceElement[] show = allInfos[i][t].getStackTrace();
                        if (count == 1) {
                            sb.append(String.format(Locale.ROOT, "  unique snapshot%n"));
                            for (int l = 0; l < show.length; l++) {
                                sb.append(String.format(Locale.ROOT, "    %s%n", show[l]));
                            }
                        } else {
                            sb.append(String.format(Locale.ROOT, "  %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
                            for (int l = show.length - maxSim; l < show.length; l++) {
                                sb.append(String.format(Locale.ROOT, "    %s%n", show[l]));
                            }
                        }
                    }
                }
            }
            return sb.toString();
        } finally {
            if (enabledCpu) {
                threadBean.setThreadCpuTimeEnabled(false);
            }
        }
    }
}

这种方式可以推广到其他Java服务环境中,引入elasticsearch的程序只要创建HotThreads对象后,都可以调用innerDetect()方法显示热点线程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容