【go-libp2p源码剖析】Swarm拨号

1. 简介

libp2p swarm 是用于 libp2p 网络的“低级”接口,可以更精细地控制系统的各个方面。swarm 可以建立监听,也可以向其他主机拨号建立新的连接(比如和某个主机建立 tcp 连接),而这里所指的拨号其实就是建立出站连接的过程,它的实现逻辑较为复杂,我们在这里做一个梳理。

2. 代码结构

仓库地址:https://github.com/libp2p/go-libp2p-swarm.git
拨号相关代码主要分布在swarm_dial.go,limiter.go,dial_sync.go这三个文件,它们包含的结构体:
swarm_dial.go:DialBackoff,backoffAddr
DialBackoff 主要用于拨号失败后再次拨号时间限制
dial_sync.go:DialSync、activeDial
DialSync 同步拨号帮助程序,同一时刻只有一个到指定 Peer 的拨号处于活跃状态
limiter.go:dialLimiter、dialJob、dialResult
dialLimiter 主要对拨号并发数限制

3.时序图

在这里插入图片描述

通过上图可以看出拨号其实是对并发拨号、同步、重试做了一系列检查,最后再调用 Transport 进行拨号。假设有1000个 Peer,每个 Peer 有5个不同的地址,如果一个一个同步拨势必影响效率,所以需要启动多个协程并发拨号,但也不能完全不限制,dialLimiter 实现了对并发拨号的限制。如果一个地址拨号失败,也不能立即就去再次尝试拨号,这样大概率会失败,这里需要等一段时间再去拨,否则就是浪费资源,所以等多久这里有个算法,DialBackoff 实现了这些功能。那为什么还需要 DialSync?外部程序在调用 DialPeer 的时候,可能启动了多个协程对同一个 Peer 进行并发拨号,因为无法限制外面是怎么调用方式,所以只有在拨号源头进行限制(内部已经实现了并发拨号)。
这里借用 swarm_dial.go 里的一张图,看看 DialSync 是如何工作的:

 Diagram of dial sync:

   many callers of Dial()   synched w.  dials many addrs       results to callers
  ----------------------\    dialsync    use earliest          /--------------
  -----------------------\             |----------\           /----------------
  ------------------------>------------<------------>---------<-----------------
  -----------------------|              \----x                 \----------------
  ----------------------|                \-----x                \---------------
                                         any may fail          if no addr at end retry dialAttempt x

3.调用入口

swarm 对外暴露了一个 DialPeer 方法,应用程序可以直接通过它对 Peer 拨号,有两处用到了它。

// DialPeer connects to a peer.
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) {
    if s.gater != nil && !s.gater.InterceptPeerDial(p) {
        log.Debugf("gater disallowed outbound connection to peer %s", p.Pretty())
        return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
    }

    return s.dialPeer(ctx, p)
}

1.BasicHost 的 Connect 方法调用了 DialPeer 方法

func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
    // absorb addresses into peerstore
    h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

    if h.Network().Connectedness(pi.ID) == network.Connected {
        return nil
    }

    resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID))
    if err != nil {
        return err
    }
    h.Peerstore().AddAddrs(pi.ID, resolved, peerstore.TempAddrTTL)

    return h.dialPeer(ctx, pi.ID)
}

func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
    log.Debugf("host %s dialing %s", h.ID(), p)
    c, err := h.Network().DialPeer(ctx, p)
    if err != nil {
        return err
    }
    select {
    case <-h.ids.IdentifyWait(c):
    case <-ctx.Done():
        return ctx.Err()
    }

    log.Debugf("host %s finished dialing %s", h.ID(), p)
    return nil
}

最后 IpfsDHT 调用了 BasicHost 的 Connect

func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
    // short-circuit if we're already connected.
    if dht.host.Network().Connectedness(p) == network.Connected {
        return nil
    }

    logger.Debug("not connected. dialing.")
    routing.PublishQueryEvent(ctx, &routing.QueryEvent{
        Type: routing.DialingPeer,
        ID:   p,
    })

    pi := peer.AddrInfo{ID: p}
    if err := dht.host.Connect(ctx, pi); err != nil {
        logger.Debugf("error connecting: %s", err)
        routing.PublishQueryEvent(ctx, &routing.QueryEvent{
            Type:  routing.QueryError,
            Extra: err.Error(),
            ID:    p,
        })

        return err
    }
    logger.Debugf("connected. dial success.")
    return nil
}

2.另外 Swarm 的 NewStream 也调用了 dialPeer,如果还没有建立连接则先拨号

func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {
    log.Debugf("[%s] opening stream to peer [%s]", s.local, p)
    dials := 0
    for {
        c := s.bestConnToPeer(p)
        if c == nil {
            if nodial, _ := network.GetNoDial(ctx); nodial {
                return nil, network.ErrNoConn
            }

            if dials >= DialAttempts {
                return nil, errors.New("max dial attempts exceeded")
            }
            dials++

            var err error
            c, err = s.dialPeer(ctx, p)
            if err != nil {
                return nil, err
            }
        }
        s, err := c.NewStream()
        if err != nil {
            if c.conn.IsClosed() {
                continue
            }
            return nil, err
        }
        return s, nil
    }
}

4.拨号程序初始化

Swarm{
    ....
    // dialing helpers
    dsync   *DialSync
    backf   DialBackoff
    limiter *dialLimiter
}

func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {
    s := &Swarm{
        local: local,
        peers: peers,
        bwc:   bwc,
    }
    .....
    s.dsync = NewDialSync(s.doDial)
    s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr)
    s.proc = goprocessctx.WithContext(ctx)
    s.ctx = goprocessctx.OnClosingContext(s.proc)
    s.backf.init(s.ctx)

    return s
}

type DialFunc func(context.Context, peer.ID) (*Conn, error)

// NewDialSync constructs a new DialSync
func NewDialSync(dfn DialFunc) *DialSync {
    return &DialSync{
        dials:    make(map[peer.ID]*activeDial),
        dialFunc: dfn,
    }
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool

func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
    fd := ConcurrentFdDials
    if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
        if n, err := strconv.ParseInt(env, 10, 32); err == nil {
            fd = int(n)
        }
    }
    return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
}

func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
    return &dialLimiter{
        isFdConsumingFnc:   isFdConsumingFnc,
        fdLimit:            fdLimit,
        perPeerLimit:       perPeerLimit,
        waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
        activePerPeer:      make(map[peer.ID]int),
        dialFunc:           df,
    }
}

func (db *DialBackoff) init(ctx context.Context) {
    if db.entries == nil {
        db.entries = make(map[peer.ID]map[string]*backoffAddr)
    }
    go db.background(ctx)
}

在 NewSwarm 实例的时候对 DialBackoff、dialLimiter、DialBackoff 三个拨号帮助程序进行了初始化。
NewDialSync 需要传入一个拨号函数做参数(实际调用 Swarm 的 doDial 函数)
newDialLimiter 则需要传入两个函数:一个为拨号函数(实际调用 Swarm 的 dialAddr 函数),一个是判断协议是否需要消耗 FD(UNIX/TCP)
DialBackoff 的 init 后台还会启动一个协程做 Backoff 清理工作

5.涉及协程

1、针对每一个 peer,在 DialSync 启动了一个协程去拨号

func (ad *activeDial) start(ctx context.Context) {
    ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id)

    // This isn't the user's context so we should fix the error.
    switch ad.err {
    case context.Canceled:
        // The dial was canceled with `CancelDial`.
        ad.err = errDialCanceled
    case context.DeadlineExceeded:
        // We hit an internal timeout, not a context timeout.
        ad.err = ErrDialTimeout
    }
    close(ad.waitch)
    ad.cancel()
}

func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
    ds.dialsLk.Lock()
    defer ds.dialsLk.Unlock()

    actd, ok := ds.dials[p]
    if !ok {
        adctx, cancel := context.WithCancel(context.Background())
        actd = &activeDial{
            id:     p,
            cancel: cancel,
            waitch: make(chan struct{}),
            ds:     ds,
        }
        ds.dials[p] = actd

        go actd.start(adctx)
    }

    // increase ref count before dropping dialsLk
    actd.incref()

    return actd
}

2、针对每一个 Peer 的每个地址,在 dialLimiter 启动了一个协程去拨号

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
    if dl.shouldConsumeFd(dj.addr) {
        if dl.fdConsuming >= dl.fdLimit {
            log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
                "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
            dl.waitingOnFd = append(dl.waitingOnFd, dj)
            return
        }

        log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
            dj.peer, dj.addr, dl.fdConsuming)
        // take token
        dl.fdConsuming++
    }

    log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
        dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
    go dl.executeDial(dj)
}

func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
    if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
        log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
            "peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
            len(dl.waitingOnPeerLimit[dj.peer]))
        wlist := dl.waitingOnPeerLimit[dj.peer]
        dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
        return
    }
    dl.activePerPeer[dj.peer]++

    dl.addCheckFdLimit(dj)
}

// executeDial calls the dialFunc, and reports the result through the response channel when finished. Once the response is sent it also releases all tokens it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
    defer dl.finishedDial(j)
    if j.cancelled() {
        return
    }

    dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
    defer cancel()

    con, err := dl.dialFunc(dctx, j.peer, j.addr)
    select {
    case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
    case <-j.ctx.Done():
        if err == nil {
            con.Close()
        }
    }
}

3、Backoff 清理

func (db *DialBackoff) init(ctx context.Context) {
    if db.entries == nil {
        db.entries = make(map[peer.ID]map[string]*backoffAddr)
    }
    go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
    ticker := time.(BackoffMax)NewTicker
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            db.cleanup()
        }
    }
}

func (db *DialBackoff) cleanup() {
    db.lock.Lock()
    defer db.lock.Unlock()
    now := time.Now()
    for p, e := range db.entries {
        good := false
        for _, backoff := range e {
            backoffTime := BackoffBase + BackoffCoef*time.Duration(backoff.tries*backoff.tries)
            if backoffTime > BackoffMax {
                backoffTime = BackoffMax
            }
            if now.Before(backoff.until.Add(backoffTime)) {
                good = true
                break
            }
        }
        if !good {
            delete(db.entries, p)
        }
    }
}

6.一些重要规则和算法

1、拨号地址的过滤

// filterKnownUndialables takes a list of multiaddrs, and removes those that we definitely don't want to dial: addresses configured to be blocked, IPv6 link-local addresses, addresses without a dial-capable transport, and addresses that we know to be our own. This is an optimization to avoid wasting time on dials that we know are going to fail.
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
    lisAddrs, _ := s.InterfaceListenAddresses()
    var ourAddrs []ma.Multiaddr
    for _, addr := range lisAddrs {
        protos := addr.Protocols()
        // we're only sure about filtering out /ip4 and /ip6 addresses, so far
        if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
            ourAddrs = append(ourAddrs, addr)
        }
    }

    return addrutil.FilterAddrs(addrs,
        addrutil.SubtractFilter(ourAddrs...),
        s.canDial,
        // TODO: Consider allowing link-local addresses
        addrutil.AddrOverNonLocalIP,
        func(addr ma.Multiaddr) bool {
            return s.gater == nil || s.gater.InterceptAddrDial(p, addr)
        },
    )
}

// FilterAddrs is a filter that removes certain addresses, according to the given filters.
// If all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
    b := make([]ma.Multiaddr, 0, len(a))
    for _, addr := range a {
        good := true
        for _, filter := range filters {
            good = good && filter(addr)
        }
        if good {
            b = append(b, addr)
        }
    }
    return b
}

// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
func AddrOverNonLocalIP(a ma.Multiaddr) bool {
    split := ma.Split(a)
    if len(split) < 1 {
        return false
    }
    if manet.IsIP6LinkLocal(split[0]) {
        return false
    }
    return true
}

2、拨号地址排序

// ranks addresses in descending order of preference for dialing   Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
    rankAddrsFnc := func(addrs []ma.Multiaddr) []ma.Multiaddr {
        var localUdpAddrs []ma.Multiaddr // private udp
        var relayUdpAddrs []ma.Multiaddr // relay udp
        var othersUdp []ma.Multiaddr     // public udp

        var localFdAddrs []ma.Multiaddr // private fd consuming
        var relayFdAddrs []ma.Multiaddr //  relay fd consuming
        var othersFd []ma.Multiaddr     // public fd consuming

        for _, a := range addrs {
            if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
                if s.IsFdConsumingAddr(a) {
                    relayFdAddrs = append(relayFdAddrs, a)
                    continue
                }
                relayUdpAddrs = append(relayUdpAddrs, a)
            } else if manet.IsPrivateAddr(a) {
                if s.IsFdConsumingAddr(a) {
                    localFdAddrs = append(localFdAddrs, a)
                    continue
                }
                localUdpAddrs = append(localUdpAddrs, a)
            } else {
                if s.IsFdConsumingAddr(a) {
                    othersFd = append(othersFd, a)
                    continue
                }
                othersUdp = append(othersUdp, a)
            }
        }

        relays := append(relayUdpAddrs, relayFdAddrs...)
        fds := append(localFdAddrs, othersFd...)

        return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...)
    }

3、Backoff 时间设定

// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5

// AddBackoff lets other nodes know that we've entered backoff with peer p, so dialers should not wait unnecessarily. We still will attempt to dial with one goroutine, in case we get through.
//
// Backoff is not exponential, it's quadratic and computed according to the following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
    saddr := string(addr.Bytes())
    db.lock.Lock()
    defer db.lock.Unlock()
    bp, ok := db.entries[p]
    if !ok {
        bp = make(map[string]*backoffAddr, 1)
        db.entries[p] = bp
    }
    ba, ok := bp[saddr]
    if !ok {
        bp[saddr] = &backoffAddr{
            tries: 1,
            until: time.Now().Add(BackoffBase),
        }
        return
    }

    backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
    if backoffTime > BackoffMax {
        backoffTime = BackoffMax
    }
    ba.until = time.Now().Add(backoffTime)
    ba.tries++
}

7.核心拨号逻辑

func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) {
    /*
        This slice-to-chan code is temporary, the peerstore can currently provide
        a channel as an interface for receiving addresses, but more thought
        needs to be put into the execution. For now, this allows us to use
        the improved rate limiter, while maintaining the outward behaviour
        that we previously had (halting a dial when we run out of addrs)
    */
    var remoteAddrChan chan ma.Multiaddr
    if len(remoteAddrs) > 0 {
        remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs))
        for i := range remoteAddrs {
            remoteAddrChan <- remoteAddrs[i]
        }
        close(remoteAddrChan)
    }

    log.Debugf("%s swarm dialing %s", s.local, p)

    ctx, cancel := context.WithCancel(ctx)
    defer cancel() // cancel work when we exit func

    // use a single response type instead of errs and conns, reduces complexity *a ton*
    respch := make(chan dialResult)
    err := &DialError{Peer: p}

    defer s.limiter.clearAllPeerDials(p)

    var active int
dialLoop:
    for remoteAddrChan != nil || active > 0 {
        // Check for context cancellations and/or responses first.
        select {
        case <-ctx.Done():
            break dialLoop
        case resp := <-respch:
            active--
            if resp.Err != nil {
                // Errors are normal, lots of dials will fail
                if resp.Err != context.Canceled {
                    s.backf.AddBackoff(p, resp.Addr)
                }

                log.Infof("got error on dial: %s", resp.Err)
                err.recordErr(resp.Addr, resp.Err)
            } else if resp.Conn != nil {
                return resp.Conn, nil
            }

            // We got a result, try again from the top.
            continue
        default:
        }

        // Now, attempt to dial.
        select {
        case addr, ok := <-remoteAddrChan:
            if !ok {
                remoteAddrChan = nil
                continue
            }

            s.limitedDial(ctx, p, addr, respch)
            active++
        case <-ctx.Done():
            break dialLoop
        case resp := <-respch:
            active--
            if resp.Err != nil {
                // Errors are normal, lots of dials will fail
                if resp.Err != context.Canceled {
                    s.backf.AddBackoff(p, resp.Addr)
                }

                log.Infof("got error on dial: %s", resp.Err)
                err.recordErr(resp.Addr, resp.Err)
            } else if resp.Conn != nil {
                return resp.Conn, nil
            }
        }
    }

    if ctxErr := ctx.Err(); ctxErr != nil {
        err.Cause = ctxErr
    } else if len(err.DialErrors) == 0 {
        err.Cause = network.ErrNoRemoteAddrs
    } else {
        err.Cause = ErrAllDialsFailed
    }
    return nil, err
}

一个 Peer 可能有多个地址,对能拨号的地址过滤后,再对这些地址排好序,扔到了这里,先把这些地址放进 channel,再遍历 channel 里的地址(dialLoop):

STEP 1. 检查上下文和响应
1.1、如果上下文取消,则跳出循环。
1.2、收到响应,并将 active 计数减1, 如果上个循环拨号出错,则 AddBackoff,并记录错误,如果上个循环拨号成功,则直接将 Conn 返回,两者都不是则继续下一次循环。

STEP 2. 尝试拨号
2.1 从 channel 中取出地址,调用 limitedDial 进行拨号(内部会启动协程去拨号),并将 active 计数加1。
2.2 如果上下文取消,则跳出循环。
2.3 收到响应,并将 active 计数减1, 如果拨号出错,则 AddBackoff,并记录错误,如果拨号成功,则直接将 Conn 返回, 两者都不是则继续下一次循环(下一次先检查上下文和响应)。

STEP 3. 返回错误。
dialLoop 结束也没有返回 Conn 则说明:上下文取消或没有地址可拨 ,要不然就是拨号出错,一个地址都没拨成功。

假设这里有三个地址 ,因为启动了协程去拨号,第一个失败了 ,第二个成功了,在等待第二个成功的响应时,第三个拨号任务可能已经执行了,这时返回 Conn 前,先会执行defer cancel() 第三个拨号任务会收到 cancel 信号,如果第三个拨号任务成功了 ,则将这个 Conn 关闭,详见dialLimiter.executeDial。再执行和 defer s.limiter.clearAllPeerDials(p),这里对waitingOnPeerLimit数据进行清理,不管拨号成功还是失败对这个 Peer 而言拨号已经结束了。

Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有非常丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具备十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。
Netwarps 专注于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具有高可用、低功耗和低网络的技术特点,适用于物联网、工业互联网等场景。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容