MapReduce如何处理不可分割文件

有时会有这样的逻辑需求,一个    map   任务需要处理一个文件中的所有内容或是

把整个文件作为一条记录处理。

即使不分割文件,仍然需要一个    RecordReader    来读取文件的所有内容作为record的值。

Hadoop-version    :    2.7.1

Jdk-version            :    1.8

Maven-version      :    3.3.3

完成此功能需要重写两个类

1.    继承    FileInputFormat    类  重写其中的两个方法以下代码为列子

public class WholeFileInputFormatextends FileInputFormat {

//    createRecordReader    方法是返回一个定制的RecordReader实现

@Override

    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

WholeFileRecordReader reader =new WholeFileRecordReader();

reader.initialize(split, context);

return reader;

}

//    isSplitable    方法是说明此文件是否分割    返回 true    为分割    返回 false 为不分割

@Override

    protected boolean isSplitable(JobContext context, Path filename) {

return false;

}

}

2.继承    RecordReader    类    重写其中的三个方法

public class WholeFileRecordReader extends RecordReader {

private FileSplitfileSplit;

private Configurationconf;

private Textkey =new Text();

private BytesWritablevalue =new BytesWritable();

private boolean processed =false;

@Override

    public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

this.fileSplit = (FileSplit) split;

this.conf = context.getConfiguration();

}

/**

*  这个方法会被调用两次

*  第一次读取整个文件流 将其转换为    BytesWritable对象

*  并且将 processed  设置为 true

*  第二次 在  processed 为true时

*  将不会对数据进行加载

*  并且返回    false 代表这个文件已经加载完成

*

    * @return

    * @throws IOException

    * @throws InterruptedException

*/

    @Override

    public boolean nextKeyValue()throws IOException, InterruptedException {

if (!processed){

byte[] contents =new byte[(int)fileSplit.getLength()];

Path file =fileSplit.getPath();

FileSystem fs = file.getFileSystem(conf);

FSDataInputStream in =null;

try {

in = fs.open(file);

IOUtils.readFully(in, contents,0, contents.length);

key.set(file.toString());

value.set(contents,0 ,contents.length);

}finally {

IOUtils.closeStream(in);

}

processed =true;

return true;

}

return false;

}

@Override

    public Text getCurrentKey()throws IOException, InterruptedException {

return key;

}

@Override

    public BytesWritable getCurrentValue()throws IOException, InterruptedException {

return value;

}

@Override

    public float getProgress()throws IOException, InterruptedException {

return processed ?1.0f :0.0f;

}

@Override

    public void close()throws IOException {

}

}

WholeFileRecordReader    类负责将FileSplit转换成一条记录,该记录的键是文件的路径

如果不需要可以不对Key进行初始化返回    Null    即可,值是这个文件的内容,因为只有一条记录

WholeFileRecordReader 要么处理这条数据,要么不处理,所以他维护一个名称为    processed

的布尔变来表示这个文件是否被处理过。如果当    nextKeyValue()    方法被调用时,文件没有被处理过,

就打开文件,产生一个长度是文件长度的字节数组,并用    Hadoop    的    IOUtils    类把文件的内容放入字节数组。

然后在被传递到    next()    方法的    BytesWritable    实例上设置数组,返回    true    则表示成功读取记录。

                以上为所有的代码实例以及说明,下面来说一下所遇到的问题

1.此程序无法    Dubug    也就是说无法调试    (至今未解决)

2.在mapper阶段获取    value    时这个值不能直接使用,因为在调用    nextKeyValue()    方法时已经转换为    BytesWritable

  类型值的长度变长了,这是由于    Hadoop    里面    BytesWritable    的实现机制造成的,BytesWritable    的实现中,

保存了一个    byte[]    存放内容,一个    int size    表示    byte    数组里面前多少位是有效的,后面的是无效的,但是    ByteWritable    的getBytes()    方法返回的确实    byte    数组的全部内容(长度很可能大于size),所以在    Mapper    中进行处理的时候应该只操纵size大小的内容后面的应该无视掉,

转换为String类型如下:String str = new String(value.getBytes(),0,value.getLength());   

或是                                value.setCapacity(value.getLength());

如果文件过大程序将不能调试,会卡死文件晓得话可以调试,具体原因不清楚

刚刚开始写简书,有很多地方还不是很懂,有不好的地方还请您指出!

在此感谢您能看完此篇文章!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,282评论 19 139
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 3,798评论 0 1
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 33,777评论 18 399
  • HashMap 是 Java 面试必考的知识点,面试官从这个小知识点就可以了解我们对 Java 基础的掌握程度。网...
    野狗子嗷嗷嗷阅读 11,690评论 9 107
  • 做某件事情之前,如果说没有想 万一失败了怎么办?以我目前的认知是不信的。这个问题该不该想?应该怎么想?我觉得是要有...
    刘员外__阅读 1,238评论 0 1

友情链接更多精彩内容