MapReduce

MapReduce介绍

Hadoop中计算模型使用的是MapReduceHadoop MapReduce计算编程模型可以用于处理大数据集,它的一般思路是将输入dataset切分为多个split,每个split交由一个mapper处理,mapper处理之后再交由reduce处理,MapReduce作业通常由成百上千的机器组成的集群并行处理。

MapReduce是基于<k,v>的编程。

理解MapReduce模型并不难,实际上MapReduce是一个很全面的抽象,本质思想就是仅靠map和list加上sort排序就可完成所有的离线分析工作 。比如Hive中的计算就是基于MapReduce的,它可以向SQL一样完成对大数据进行分析,其工作原理是通过Hive词法分析语法分析等引擎将SQL转化为MapReduce作业。这也说明了MapReduce编程模型的通用型,几乎所有的sql等离线分析都可以转化为MapReduce

虽然MapReduce简单通用但是MapReduce编程范式也存在一些问题,由于MapReduce只是简单的KV处理,对应大数据集或者复杂的算法比如机器学习,需要多个阶段的MapReduce,多次MapReduce迭代和写磁盘操作带来了时间性能上的问题。Spark内存计算模型,Tez引擎等计算框架的在大部分分析领域处理速度都要优于MapReduce。所以MapReduce引擎也逐渐被这些引擎所替换。但是还是有必要学习MapReduce引擎,因为它的很多思想在随后诞生的计算框架中都有借鉴和继承。并且由于MapReduce相对比较简单,比较适合大数据入门学习

输入输出

k1,v1, k2,v2,k3,v3代表的是数据类型

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

WorldCount

那么应该如何编写Hadoop Mapreduce程序呢?Hadoop官网给出了一个通过Hadoop MapReduce统计单词数据的程序。

编程流程一般是实现MapperReducer接口,并实现对应的mapreduce方法,然后在main函数中设置相关参数,提交作业。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Mapper

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  /**
   * The <code>Context</code> passed on to the {@link Mapper} implementations.
   */
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

Mapper 用于将输入健值对(<k1,v1>)转换为中间健值对(<k2,v2>)

map数的确定

Hadoop MapReduce框架为InputFormat产生的每个InputSplit生成一个map

mapper的实现通过Job.setMapperClass(Class) 传递给Job。
在框架内部调用的是Mapperrun方法,流程如下:

  1. 调用setup(context)
  2. InputSplit的每个健值对调用map(context.getCurrentKey(), context.getCurrentValue(), context)
  3. 调用cleanup(context)
  • mapreduce是多对多的关系,也有可能没有reduce
    每个map产生的数据发往reduce之前都需要分区,分区数取决于reduce的个数

  • map端产生的数据按照分区发送给ReduceReduce端对接受到的数据merge为一个大文件,然后分组(grouping)传递给Reducer。用户可以使用job.setGroupingComparatorClass(Class)自定义分组

  • Mapper的输出是排序并且按Reducer个数分区的,排序分区在map的环形内存中完成。用户可以自己实现Partitioner控制keyReducer的规则

map端可以使用combiner对产生的中间结果本地聚合,减少shufflereduce的数据量

Reducer

Reducer有三个主要阶段:shufflesortreduce
作业的reduce数量由用户通过Job.setNumReduceTasks(int)设置。

job通过 Job.setReducerClass(Class)设置reducer的实现类。然后框架对每一个已分组的 <key, (list of values)>调用 reduce(WritableComparable, Iterable<Writable>, Context),应用程序可以覆盖cleanup(Context)方法来执行任何所需的清除

Shuffle

Reducer的输入在map端已经排好序了。在这个阶段框架通过HTTP获取mapper输出中对应的分区数据

Sort

框架在这个阶段按key对Reducer输入进行分组(因为不同的mapper可能输出相同的key)
Shuffle 和Sort是同时发生的,在取到mapper数据时就会进行merge操作

Secondary Sort

当reduce的分组判断规则和map传过来的中间结果相等分组判断规则不同时,可以通过Job.setSortComparatorClass(Class)指定一个ComparatorJob.setGroupingComparatorClass(Class)可以控制map中间结果的分组,配合Job.setSortComparatorClass(Class)可以实现基于值的二次排序。

Reduce

在这个阶段对每一个已分组的 <key, (list of values)>调用
reduce(WritableComparable, Iterable<Writable>, Context)。reduce任务的输出一般通过Context.write(WritableComparable, Writable)写入FileSystem
应用程序可以使用Counter报告其统计信息。
Reducer的输出没有排序。

排序过程

Hadoop MapReduce中发往reduce的数据默认是按照key排序的,可以说排序一直贯穿于整个过程中

  • map阶段数据首先在内存中分区排序(排序使用quicksort),然后对多个溢写的文件进行归并排序(mergesort),在归并过程中应用combiner本地聚合,产生map端最终输出
  • reduce阶段向map端拉取自己分区的数据,同时对从多个map端拉取的数据同样使用归并排序合成一个文件,然后分组调用reduce

一些基本术语

  • application这个可以简单理解为编写完成相关MapReduce程序,它是集群提交job的最小单位,一个application 里面可以有多个job,一般一个application有一个job
  • job 用于描述MapReduce的配置组成 ,包括输入输出规范,指定mapperreducer实现类,分区,分组以及提交job
  • task 分为maptaskreductask ,具体的执行任务

Job

Job是用户向Hadoop框架描述MapReduce作业以供执行的主要接口,Job用于配置提交MapReduce作业,但是:

  • 一些配置参数可能已经被管理员标记为final(参见Final Parameters),不能更改

Job可以用于设置非常多的参数,如下是一些主要的配置:

  • InputFormat 输入规范,包括分片计算InputSplit和面向记录的阅读器RecordReader
  • Mapper
  • Partitioner Mapper中间结果的分区方式
  • combiner(如果有的话) Mapper分区结果本地聚合
  • Reducer
  • OutputFormat 输出规范

job input

InputFormat
InputFormat类图
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {

  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}
RecordReader
RecordReader

InputFormat描述了MapReduce作业的输入规范,MapReduce框架依赖于作业的InputFormat完成
1 . 验证作业的输入规范
2 . 将输入文件分割成逻辑InputSplit实例,然后将每个实例分配给单个Mapper

  1. 提供RecordReader实现,用于从逻辑InputSplit中收集输入记录,以供mapper程序处理

基于文件的默认InputFormat 实现为FileInputFormat,它根据输入文件的总大小(以字节为单位)将输入拆分为逻辑InputSplit实例。一般输入文件的FileSystemblocksize 作为分片的上界。可以通过mapreduce.input.fileinputformat.split.minsize设置分片下界。
显然,对于许多应用程序来说,基于输入大小的逻辑分割是不够的,因为必须考虑记录边界。在这种情况下,应用程序应该实现RecordReader,它会考虑记录边界,并向单个任务提供面向记录的逻辑InputSplit视图

job.setInputFormatClass()可以用于设置InputFormat,如果没有设置默认为TextInputFormat

如果TextInputFormat是给定作业的InputFormat,框架将检测带有.gz扩展名的输入文件,并使用适当的CompressionCodec自动解压它们。但是,必须注意的是,不能分割具有上述扩展名的压缩文件,每个压缩文件都由单个mapper完整地处理。

InputSplit

InputSplit表示由单个mapper处理的数据
InputSplit通常呈现一个面向字节的输入视图,RecordReader负责处理并呈现一个面向记录的视图。
FileSplit是默认的InputSplit

public interface InputSplit extends Writable {
  long getLength() throws IOException;
  String[] getLocations() throws IOException;
}

job output

OutputFormat描述MapReduce作业的输出规范。
MapReduce框架依赖于作业的OutputFormat完成:

  • 验证作业的输出规范;例如,检查输出目录不存在。
  • 提供用于写入作业输出文件的RecordWriter实现。输出文件存储在FileSystem

TextOutputFormat是默认的OutputFormat

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {

  public abstract RecordWriter<K, V> 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;

  public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;

  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context
                                     ) throws IOException, InterruptedException;
}

OutputCommitter

OutputCommitter描述MapReduce作业的任务输出的提交。
MapReduce框架依赖于作业的OutputCommitter完成:

  • 在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录
  • 在作业完成后清理作业。例如,在作业完成后删除临时输出目录。
  • 设置任务临时输出。任务设置是在任务初始化期间作为同一任务的一部分完成的
  • 检查任务是否需要提交。这是为了在任务不需要提交时避免提交过程
  • 任务输出的提交。任务完成后,如果需要,任务将提交它的输出
  • 丢弃任务提交。如果任务失败/终止,输出将被清理。如果任务无法清除(在异常块中),将使用相同的attempt-id启动一个单独的任务来执行清除

FileOutputCommitter是默认的OutputCommitter

RecordWriter

RecordWriter将输出<key,value>对写入输出文件。
RecordWriter实现将作业输出写入文件系统。

MapReduce - User Interfaces

应用程序通常实现MapperReducer接口来提供mapreduce方法。这些构成了工作的核心

Mapper

Mapper将输入键/值对映射为一组中间键/值对。

Mapper类图

image.png

map是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录具有相同的类型。给定的输入对可以映射到零或多个输出对

Hadoop MapReduce通过InputFormat生成InputSplit,每个生成的InputSplit由一个map任务处理

job通过Job.setMapperClass(Class)来设置mapper的实现类。然后框架为该任务的InputSplit中的每个键/值对调用 map(WritableComparable, Writable, Context)完成相关的处理,应用程序可以覆盖cleanup(Context)方法来执行任何所需的清除

mapper处理输入键值对得到的中间键值对可以和输入键值对的类型不同,一个输入键值对可以产生0到多个中间键值对,中间结果通过context.write(WritableComparable, Writable)保存

应用程序可以使用Counter报告其统计信息。

mapper产生的中间结果最终会按照key分组然后传递给reducer,然后reducer处理mapper产生的中间结果,最后产生结果。用户可以通过Job.setGroupingComparatorClass(Class)指定一个Comparator来控制分组

Mapper的中间结果会被排序然后按reducer个数分区分区数等于reducer的个数。用户可以实现自定义Partitioner控制<key,value>的发送规则

用户可以选择通过 Job.setCombinerClass(Class)指定一个combiner来执行中间输出的本地聚合(mapper端reduce),这有助于减少从Mapper传输到Reducer的数据量

经过排序的中间输出总是以简单的(key-len、key、value-len、value)格式存储。应用程序通过配置使用CompressionCodec可以控制中间输出是否被压缩以及如何被压缩

Mapper InputSplits Records之间的关系

InputFormat 负责计算分片(InputSplits )并将它们划分为记录(Records)。分片是物理概念和记录是逻辑概念

关于Mapper 中setup,map、cleanup、partitioner、combiner的关系

前面已经知道一个split由一个mapper处理,hadoop框架通过反射job configuration中的Mapper类创建mapper实例,调用mappersetup方法,然后在mappermap中使用RecordReader 读取split获取<k,v>迭代处理,并调用partitioner,之后调用cleanup方法,最后调用combiner方法做map端本地归并,减少数据量以减少到reduce的网络IO

简而言之,一个mapper实例对应一个partitioner,对应多个combinercombiner个数为reducer个数

作业提交流程

 childUGI.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws Exception {
          // use job-specified working directory
          setEncryptedSpillKeyIfRequired(taskFinal);
          FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
          taskFinal.run(job, umbilical); // run the task
          return null;
        }
      });
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      input.initialize(split, mapperContext);
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

编程步骤

image.png
  1. 配置Configuration
  2. 继承Mapper和Reducer
  3. 创建job 配置job
  4. 提交job

input 和output

mapreduce 数据都是转化为<key,value>键值对
keyvalue类必须是可序列化的,在hadoop中需要实现Writable
接口。此外,key类必须实现 WritableComparable接口,以便可以进行相关的排序操作

MapReduce提交运行整体流程

  1. job.submit()提交
  2. JobSubmitter.submitJobInternal
  3. YarnRunner.submitJob
  4. ResourceMgrDelegate.submitApplication
  5. YarnClient.submitApplication
  6. ApplicationClientProtocolPBClientImpl.submitApplication
JobSubmitter.submitJobInternal主要的代码逻辑
//通过反射实例化OutputFormat,并调用其checkOutputSpecs方法,比如FileOutputFormat实现的checkOutputSpecs需要判断目标路径不存在否则抛出FileAlreadyExistsException
 checkSpecs(job);
 Configuration conf = job.getConfiguration();
//主要是将一些archive文件解压到nodemanager
 addMRFrameworkToDistributedCache(conf);
 JobID jobId = submitClient.getNewJobID();
//获取并设置唯一jobId
 job.setJobID(jobId);
 Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//拷贝配置文件到提交目录
 copyAndConfigureFiles(job, submitJobDir);
//计算分片,分片数决定map数
 int maps = writeSplits(job, submitJobDir);
//写配置 job.xml
 writeConf(conf, submitJobFile);

参考

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • MapReduce执行流程 MapReduce的执行步骤 1、Map任务处理 1.1 读取HDFS中的文件。每一行...
    依天立业阅读 2,233评论 0 8
  • 预览 Hadoop MapReduce是一个软件框架,用于编写并行处理海量数据的应用程序,应用程序运行在一个通用硬...
    sakersun阅读 1,550评论 0 1
  • 目标 该文档作为一份个人指导全面性得描述了所有用户使用Hadoop Mapreduce框架时遇到的方方面面。 准备...
    和心数据阅读 1,486评论 0 15
  • 今天看了一下Python源码,简单了解获取__dict__的流程,这里做一下简单的总结,为后续回头查看提供方便 C...
    whosemario阅读 1,370评论 0 0
  • 上世纪九十年代,也就是我刚出生的时候,我的村庄还是错落有致的青瓦房,鳞次栉比的屋脊,浓密的树林还有很多现在都已见不...
    羽中文阅读 155评论 0 1