hdfs写之打开文件流<一>

一、客户端创建写文件的流程图

namenode主要创建客户端RPC请求的准写的文件,namenode会把文件名保存到namespace中,然后返回给客户端HdfsFileStatus,最后客户端在获取hdfsFileStatus时候new DFSOutputStream实例,并启动DFSOutputStream中的 DataStreamer Demon线程并且返回给客户端DFSOutputStream对象。

image.png

二、客户端FileSystem创建输出流

1、客户端通过FileSystem.create方法创建out输出流

客户端调用create方法获取FSDataOutputStream,其实调用的是DistributedFileSystem实例,因为该方法是FileSystem子类

/**
   * Create an FSDataOutputStream at the indicated Path.
   * @param f the file name to open
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file. 
   */
  public FSDataOutputStream create(Path f, 
                                   boolean overwrite,
                                   int bufferSize,
                                   short replication,
                                   long blockSize
                                   ) throws IOException {
    return create(f, overwrite, bufferSize, replication, blockSize, null);
  }

调用DistributedFileSystem的create方法

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * @param f the file name to open
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file. 
   */
  public FSDataOutputStream create(Path f,
                                            boolean overwrite,
                                            int bufferSize,
                                            short replication,
                                            long blockSize,
                                            Progressable progress
                                            ) throws IOException {

    //1.DistributedFileSystem.create
    return this.create(f, FsPermission.getFileDefault().applyUMask(
        FsPermission.getUMask(getConf())), overwrite, bufferSize,
        replication, blockSize, progress);
  }
2、客户端DistributedFileSystem.create方法创建out输出流

根据bufferSize,客户端名称创建输出流

  @Override
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
//    1. DistributedFileSystem DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1307090268_1, ugi=lixun_XXX (auth:SIMPLE)]]
    return this.create(f, permission,
        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
        blockSize, progress, null);
  }

然后调用FileSystemLinkResolver. resolve()方法,该方法中调用了doCall方法,如果创建流失败,会调用next方法继续调用doCall直到流成功为止,当然默认只能尝试创建32次。

 @Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      //resolve方法中会调用doCall,如果有异常才会调用next继续尝试create数据流,默认尝试32次不成功抛出IO异常
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        //1.调用DFSClient.create,
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        //2.包装成HdfsDataOutputStream流返回
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }
3、通过DFSClient.create方法创建out输出流
  /**
   * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
   *  set to true.
   */
  public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt)
      throws IOException {
    return create(src, permission, flag, true,
        replication, blockSize, progress, buffersize, checksumOpt, null);
  }

调用newStreamForCreate方法

public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt,
                             InetSocketAddress[] favoredNodes) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getFileDefault();
    }
    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
    if(LOG.isDebugEnabled()) {
      LOG.debug(src + ": masked=" + masked);
    }
     // 通过rpc请求namenode创建hdfsfile之后,然后在创建FSDataOutputStream流
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes));
    //获取输出流之后,开始对这个文件进行租约lease,过期的租约会导致租约持有者不能写数据,也客户防止多个客户端写一个文件。
    beginFileLease(result.getFileId(), result);
    return result;
  }

4、通过DFSOutputStream.newStreamForCreate()向namenode远程RPC

客户端向namenode发起请求,获取写文件的路径,block大小等文件信息,然后客户端通过这些信息创建DataStreamer,封装成DFSOutputStream

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("newStreamForCreate", src);
    try {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      //默认尝试10次连接
      int retryCount = CREATE_RETRY_COUNT;
      while (shouldRetry) {
        shouldRetry = false;
        try {
          //rpc调用namenode创建一个文件并返回该文件的HdfsFileStatus信息,然后通过该信息创建DataStreamer,返回DFSOutputStream
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS);
          break;
        } catch (RemoteException re) {
          IOException e = re.unwrapRemoteException(
              AccessControlException.class,
              DSQuotaExceededException.class,
              FileAlreadyExistsException.class,
              FileNotFoundException.class,
              ParentNotDirectoryException.class,
              NSQuotaExceededException.class,
              RetryStartFileException.class,
              SafeModeException.class,
              UnresolvedPathException.class,
              SnapshotAccessControlException.class,
              UnknownCryptoProtocolVersionException.class);
          if (e instanceof RetryStartFileException) {
            if (retryCount > 0) {
              shouldRetry = true;
              retryCount--;
            } else {
              throw new IOException("Too many retries because of encryption" +
                  " zone operations", e);
            }
          } else {
            throw e;
          }
        }
      }
      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes);
      //启动DataStreamer Demon线程
      out.start();
      return out;
    } finally {
      scope.close();
    }
  }

在构造方法中创建一个DataStreamer守护线程。该线程主要负责向多少个datanode写block并构建一个pipline,当然也会请求namenode写哪些datanode合适。后面会分析这个线程作用。

 /** Construct a new output stream for creating a file. */
  private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
      EnumSet<CreateFlag> flag, Progressable progress,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    this(dfsClient, src, progress, stat, checksum);
    //是否久化到磁盘,默认false
    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
    //计算chunk大小已经每个packet包含多少个chunk(默认127个),writePacketSize默认65536 bytesPerChecksum默认521,计
    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);

    streamer = new DataStreamer(stat, null);
    //favoredNodes null
    if (favoredNodes != null && favoredNodes.length != 0) {
      streamer.setFavoredNodes(favoredNodes);
    }
  }
5、通过ClientNamenodeProtocolTranslatorPB.create()向namenode远程RPC

通过ClientProtocol协议向namenode server发起请求

  @Override
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws AccessControlException, AlreadyBeingCreatedException,
      DSQuotaExceededException, FileAlreadyExistsException,
      FileNotFoundException, NSQuotaExceededException,
      ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
      IOException {
    CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
        .setSrc(src)
        .setMasked(PBHelper.convert(masked))
        .setClientName(clientName)
        .setCreateFlag(PBHelper.convertCreateFlag(flag))
        .setCreateParent(createParent)
        .setReplication(replication)
        .setBlockSize(blockSize);
    builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
    CreateRequestProto req = builder.build();
    try {
      //localhost/127.0.0.1:52029,org.apache.hadoop.hdfs.protocol.ClientProtocol
      CreateResponseProto res = rpcProxy.create(null, req);
      return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }

  }

三、服务端处理客户端create请求

1、NameNodeRpcServer.create()处理客户端create请求

namenode接收客户端写请求,调用startFile方法创建HdfsFileStatus对象,通过ClientProtocol协议返回给客户端。

  @Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws IOException {
    checkNNStartup();
    //获取发起请求客户端地址
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
          +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    //cacheEntry.isSuccess() return false
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }

    HdfsFileStatus status = null;
    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
      //namesystem=FSNamesystem
        //在namespace中创建hdfs file并返回文件信息状态
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
  }

2、FSNamesystem.startFile()开始在namespace创建文件

该方法主要调用startFileInt方法在namespace中创建文件

/**
   * Create a new file entry in the namespace.
   * 
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}, except it returns valid file status upon
   * success
   */
  HdfsFileStatus startFile(String src, PermissionStatus permissions,
      String holder, String clientMachine, EnumSet<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
      throws AccessControlException, SafeModeException,
      FileAlreadyExistsException, UnresolvedLinkException,
      FileNotFoundException, ParentNotDirectoryException, IOException {

    HdfsFileStatus status = null;
    try {
      status = startFileInt(src, permissions, holder, clientMachine, flag,
          createParent, replication, blockSize, supportedVersions,
          logRetryCache);
    } catch (AccessControlException e) {
      logAuditEvent(false, "create", src);
      throw e;
    }
    return status;
  }

该方法主要校验路径,block等信息,然后处理某些参数,最后调用startFileInternal方法写文件到namespace中,并构建HdfsFileStatus对象返回客户端。HdfsFileStatus主要包含客户端需要的写文件的block大小,权限,group,owner,权限,副本数等信息。

private HdfsFileStatus startFileInt(final String srcArg,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      boolean logRetryCache)
      throws AccessControlException, SafeModeException,
      FileAlreadyExistsException, UnresolvedLinkException,
      FileNotFoundException, ParentNotDirectoryException, IOException {
    String src = srcArg;
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      StringBuilder builder = new StringBuilder();
      builder.append("DIR* NameSystem.startFile: src=" + src
              + ", holder=" + holder
              + ", clientMachine=" + clientMachine
              + ", createParent=" + createParent
              + ", replication=" + replication
              + ", createFlag=" + flag.toString()
              + ", blockSize=" + blockSize);
      builder.append(", supportedVersions=");
      if (supportedVersions != null) {
        builder.append(Arrays.toString(supportedVersions));
      } else {
        builder.append("null");
      }
      NameNode.stateChangeLog.debug(builder.toString());
    }
    if (!DFSUtil.isValidName(src)) {
      throw new InvalidPathException(src);
    }
    //校验副本数是否合法
    blockManager.verifyReplication(src, replication, clientMachine);

    boolean skipSync = false;
    HdfsFileStatus stat = null;
    FSPermissionChecker pc = getPermissionChecker();
    checkOperation(OperationCategory.WRITE);
    //blockSize不能小于1M
    if (blockSize < minBlockSize) {
      throw new IOException("Specified block size is less than configured" +
          " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
          + "): " + blockSize + " < " + minBlockSize);
    }
    //src不是以/.reserved开头的文件名则pathComponents null
    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
    //flag 包含0 = {CreateFlag@5195} "CREATE" 1 = {CreateFlag@5196} "OVERWRITE"
    boolean create = flag.contains(CreateFlag.CREATE);
    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
    //false
    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);

    //等待5秒
    waitForLoadingFSImage();

    /**
     * If the file is in an encryption zone, we optimistically create an
     * EDEK for the file by calling out to the configured KeyProvider.
     * Since this typically involves doing an RPC, we take the readLock
     * initially, then drop it to do the RPC.
     * 
     * Since the path can flip-flop between being in an encryption zone and not
     * in the meantime, we need to recheck the preconditions when we retake the
     * lock to do the create. If the preconditions are not met, we throw a
     * special RetryStartFileException to ask the DFSClient to try the create
     * again later.
     */
    CryptoProtocolVersion protocolVersion = null;
    CipherSuite suite = null;
    String ezKeyName = null;
    EncryptedKeyVersion edek = null;
    //provider  is null
    if (provider != null) {
      readLock();
      try {
        src = dir.resolvePath(pc, src, pathComponents);
        INodesInPath iip = dir.getINodesInPath4Write(src);
        // Nothing to do if the path is not within an EZ
        final EncryptionZone zone = dir.getEZForPath(iip);
        if (zone != null) {
          protocolVersion = chooseProtocolVersion(zone, supportedVersions);
          suite = zone.getSuite();
          ezKeyName = zone.getKeyName();

          Preconditions.checkNotNull(protocolVersion);
          Preconditions.checkNotNull(suite);
          Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
              "Chose an UNKNOWN CipherSuite!");
          Preconditions.checkNotNull(ezKeyName);
        }
      } finally {
        readUnlock();
      }

      Preconditions.checkState(
          (suite == null && ezKeyName == null) ||
              (suite != null && ezKeyName != null),
          "Both suite and ezKeyName should both be null or not null");

      // Generate EDEK if necessary while not holding the lock
      edek = generateEncryptedDataEncryptionKey(ezKeyName);
      EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
    }

    // Proceed with the create, using the computed cipher suite and 
    // generated EDEK
    BlocksMapUpdateInfo toRemoveBlocks = null;
    writeLock();
    try {
      checkOperation(OperationCategory.WRITE);
      //是否安全模式
      checkNameNodeSafeMode("Cannot create file" + src);
      dir.writeLock();
      try {
        //处理一下path
        src = dir.resolvePath(pc, src, pathComponents);
        final INodesInPath iip = dir.getINodesInPath4Write(src);
        //在namespache创建文件,如果是overwrite删除已经存在的文件并返回删除的block
        toRemoveBlocks = startFileInternal(
            pc, iip, permissions, holder,
            clientMachine, create, overwrite,
            createParent, replication, blockSize,
            isLazyPersist, suite, protocolVersion, edek,
            logRetryCache);
        //创建HdfsFileStatus
        stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
      } finally {
        dir.writeUnlock();
      }
    } catch (StandbyException se) {
      skipSync = true;
      throw se;
    } finally {
      writeUnlock();
      // There might be transactions logged while trying to recover the lease.
      // They need to be sync'ed even when an exception was thrown.
      if (!skipSync) {
        getEditLog().logSync();
        if (toRemoveBlocks != null) {
          //在blockmanager 删除removeBlocks
          removeBlocks(toRemoveBlocks);
          toRemoveBlocks.clear();
        }
      }
    }

    logAuditEvent(true, "create", srcArg, null, stat);
    return stat;
  }

如果文件已经存在则删除已存在文件,然后重新建文件,最终由这个dir.addFile方法在namespace上创建新的文件。

/**
   * Create a new file or overwrite an existing file<br>
   * 
   * Once the file is create the client then allocates a new block with the next
   * call using {@link ClientProtocol#addBlock}.
   * <p>
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}
   */
  private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
      INodesInPath iip, PermissionStatus permissions, String holder,
      String clientMachine, boolean create, boolean overwrite, 
      boolean createParent, short replication, long blockSize, 
      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
      EncryptedKeyVersion edek, boolean logRetryEntry)
      throws IOException {
    assert hasWriteLock();
    // Verify that the destination does not exist as a directory already.
    final INode inode = iip.getLastINode();
    final String src = iip.getPath();
    if (inode != null && inode.isDirectory()) {
      throw new FileAlreadyExistsException(src +
          " already exists as a directory");
    }
    //判断inode中是否已经存在src文件,如果不存在就重建否则overwrite
    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
    if (isPermissionEnabled) {
      if (overwrite && myFile != null) {
        dir.checkPathAccess(pc, iip, FsAction.WRITE);
      }
      /*
       * To overwrite existing file, need to check 'w' permission 
       * of parent (equals to ancestor in this case)
       */
      dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
    }
    if (!createParent) {
      dir.verifyParentDir(iip, src);
    }

    FileEncryptionInfo feInfo = null;

    final EncryptionZone zone = dir.getEZForPath(iip);
    if (zone != null) {
      // The path is now within an EZ, but we're missing encryption parameters
      if (suite == null || edek == null) {
        throw new RetryStartFileException();
      }
      // Path is within an EZ and we have provided encryption parameters.
      // Make sure that the generated EDEK matches the settings of the EZ.
      final String ezKeyName = zone.getKeyName();
      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
        throw new RetryStartFileException();
      }
      feInfo = new FileEncryptionInfo(suite, version,
          edek.getEncryptedKeyVersion().getMaterial(),
          edek.getEncryptedKeyIv(),
          ezKeyName, edek.getEncryptionKeyVersionName());
    }

    try {
      BlocksMapUpdateInfo toRemoveBlocks = null;
      if (myFile == null) {
        if (!create) {
          throw new FileNotFoundException("Can't overwrite non-existent " +
              src + " for client " + clientMachine);
        }
      } else {
        //重写文件
        if (overwrite) {
          toRemoveBlocks = new BlocksMapUpdateInfo();
          List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
          //先删除文件
          long ret = FSDirDeleteOp.delete(dir, iip, toRemoveBlocks,
                                          toRemoveINodes, now());
          if (ret >= 0) {
            iip = INodesInPath.replace(iip, iip.length() - 1, null);
            FSDirDeleteOp.incrDeletedFileCount(ret);
            removeLeasesAndINodes(src, toRemoveINodes, true);
          }
        } else {
          // If lease soft limit time is expired, recover the lease
          recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
              iip, src, holder, clientMachine, false);
          throw new FileAlreadyExistsException(src + " for client " +
              clientMachine + " already exists");
        }
      }
      // dfs.namenode.max.objects默认0不校验
      checkFsObjectLimit();
      INodeFile newNode = null;

      //递归创建所有目录
      // Always do an implicit mkdirs for parent directory tree.
      Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
          .createAncestorDirectories(dir, iip, permissions);
      if (parent != null) {
        //将文件创建到namespace中
        iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
            replication, blockSize, holder, clientMachine);
        newNode = iip != null ? iip.getLastINode().asFile() : null;
      }

      if (newNode == null) {
        throw new IOException("Unable to add " + src +  " to namespace");
      }
      leaseManager.addLease(newNode.getFileUnderConstructionFeature()
          .getClientName(), src);

      // Set encryption attributes if necessary
      if (feInfo != null) {
        dir.setFileEncryptionInfo(src, feInfo);
        newNode = dir.getInode(newNode.getId()).asFile();
      }

      setNewINodeStoragePolicy(newNode, iip, isLazyPersist);

      // record file record in log, record new generation stamp
      getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added {}" +
          " inode {} holder {}", src, newNode.getId(), holder);
      return toRemoveBlocks;
    } catch (IOException ie) {
      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
          ie.getMessage());
      throw ie;
    }
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 随着数据量越来越大, 在 一个操作系统管辖的范围存不下了, 那么就 分配到更多的操作系统管理的磁盘中, 但是不方便...
    tracy_668阅读 2,628评论 0 6
  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    须臾之北阅读 845评论 0 1
  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    须臾之北阅读 477评论 0 0
  • 我又想你了。 当我走到窗口,视线投向窗外的满天飞雪,思绪却飘到你的身上。 你今天、你现在,穿的什么衣服,在做什么?...
    景雪儿可可阅读 248评论 0 3
  • “憾”在汉语字典里的解释是表示悔恨失望,心中感到不满意。 本义:心里因心酸而难受、憋闷。 而在我看来,“憾”表示无...
    拾柒1212阅读 1,029评论 0 1