压缩
文件压缩有两个好处:减少了存储文件所需的空间,并加速数据在网络和磁盘间的传输
常见的压缩格式:
所有的压缩算法都需要在空间和时间上进行权衡,其中splitable
表明是否可分,即是否可以搜索数据流的任意位置,并从该位置开始读取数据,可分的数据更适合MapReduce框架处理,如果使用gzip
文件作为输入,hadoop通过文件扩展名确定文件不可分,将会使用一个map任务处理所有的数据块,在这种情况下最好在之前的数据处理中,选择合适的数据块大小,将文件切分成块,然后对每个块建立压缩文件
通用压缩解压缩算法接口CompressionCodec
public interface CompressionCodec {
// 对写入的数据进行压缩,输出到out
CompressionOutputStream createOutputStream(OutputStream out)
// 对输入数据流in中的数据进行解压缩
CompressionInputStream createInputStream(InputStream in) throws IOException;
...
}
public interface SplittableCompressionCodec extends CompressionCodec {
...
SplitCompressionInputStream createInputStream(InputStream seekableIn,
Decompressor decompressor, long start, long end, READ_MODE readMode)
throws IOException;
}
CompressionCodecFactory
根据文件名称找到正确的codec
String uri;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
默认情况下会优先使用native类库实现codec,如果查找不到才会使用纯Java实现的算法
hadoop支持的压缩格式有DEFLATE,gzip,bzip2,LZ4,Snappy,此外还可以通过设定io.compression.codecs
设定添加额外的编解码器
串行化
Writable
Hadoop 的序列化格式是Writable
,对应接口定义了两个方法,用来将数据写入DataOutput二进制流以及从DataInput二进制流读出状态
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
WritableComparable
WritableComparable
同时实现Writable
和Comparable
接口
public interface WritableComparable<T> extends Writable, Comparable<T>
类型比较对MapReduce来说是至关重要,因为中间有一个基于键值的排序阶段
任何在Hadoop Map-Reduce框架中用作key的类型都应该实现这个接口
RawComparator
是一个优化的比较器,可以直接比较对象的字节流,不需要反序列化
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
WritableComparator
WritableComparator
是一个比较器,用来比较WritableComparable
类型的数据,实现了RawComparator
接口,提供compare(byte[],int,int,byte[],int,int)
方法,默认方法是反序列化对象,调用对象本身的compareTo
方法
可以通过继承WritableComparator
对比较函数compare
进行重载优化,同时注册相关类对应的比较器define(Class c, WritableComparator comparator)
,之后可以直接获取WritableComparator get(Class<? extends WritableComparable> c)
Writable 类
Java 基本类型
Java 基本类型都有实现Writable接口的包装类型,char类型除外
通过get和set方法读取存储封装的值
int 和 long 类型有对应的变长格式封装,方便节省空间
Text 类型
使用标准UTF-8编码存储文本
索引:指的是编码后字节序列中的位置
getlength
:字节的长度
charAt
:Unicode编码的一个码点,类似于String的codePointAt
方法
find
:返回第一个查找到的位置对应的字节偏移量
Unicode
- 码点 code point : 一个编码表中的某个字符对应的代码值,书写方式
U+
后边跟十六进制数字 - Unicode :字符集,码点从U+0000到U+10FFFF一共21位,1,112,064个码位,分为17个代码级别,第一个平面称为基本多语言平面(Basic Multilingual Plane, BMP),或称第零平面(Plane 0),其他平面称为辅助平面(Supplementary Planes),基本多语言平面内,从U+D800到U+DFFF之间的码位区块永久保留不映射Unicode字符
- UTF-8:变长编码规则,把Unicode字符集编码为1到4个字节
- UTF-16:变长编码规则,把Unicode字符集编码为2个或者4个字节,Java的char类型就是UTF-16编码的一个代码单元
BytesWritable
对字节序列的封装,存储格式为字节数目(4 bytes整数字段),后面跟字节内容本身,内容可以通过set方法改变
NullWritable
不可变单例类型,调用NullWritable.get()
获取,序列化长度为0,不从数据流中读取,也不写入数据
ObjectWritable / GenericWritable
ObjectWritable
是一个通用的封装,用来处理String,arrays,primitive types,null等等,适合用在一个字段有多种数据类型的情况
在Hadoop RPC中,用来封装和解封装方法参数和返回类型
序列化时,每次要写出封装的类名称,更为高效的方法是使用GenericWritable
,声明可能需要的类型,用相应的索引来标识类别,消耗一个字节
Writable 集合类
有6个集合类,分别是ArrayWritable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWritable, EnumSetWritable
,
- ArrayWritable / TwoDArrayWritable :对应数组和二维数组的Writabe实例
- ArrayPrimitiveWritable:对Java 基本数据类型和其数组的包装器类,没有涉及底层复制
- MapWritable /SortedMapWritable:实现
java.util.Map<Writable, Writable>
/java.util.SortedMap<WritableComparable, Writable>
,K/V的类型信息是串行化格式的一部分,占用一个字节,作为对应类型信息的一个索引,可以存放127种自定义的类型,可以使用NullWritable作为V,存储set集合 - EnumSetWritable:枚举类型
自定义Writable
可以控制二进制表示形式和排序顺序,更好的提升性能
自定义的实现必须有一个默认构造函数,以便 MapReduce 框架可以实例化,并调用readFields
填充字段
自定义比较函数器,直接对字节序列进行比较
//内嵌一个Comparator内部类
public static class Comparator extends WritableComparator{
public Comparator() {
super(外部类.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
.....例如
WritableUtils.decodeVIntSize(b1[s1]) //解析vint / vlong的第一个字节来确定字节长度
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
.... //两种比较方法应该有相同的语义
}
}
static {
WritableComparator.define(外部类.class, new Comparator());
}
序列化框架
序列化框架都必须实现org.apache.hadoop.io.serializer.Serialization
接口,通过io.serializations
属性指定使用的框架,可以通过代码,也可以通过core-default.xml
文件指定
默认属性如下:意味着只有Writable 或 Avro 对象可以在外部序列化/反序列化
<property>
<name>io.serializations</name>
<value>
org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization
</value>
<source></source>
</property>
另外还提供了一个Java Serialization序列化形式对应的框架org.apache.hadoop.io.serializer.JavaSerialization