一、什么是Sqoop
Sqoop是一个在结构化数据和Hadoop之间进行批量数据迁移的工具,结构化数据可以是Mysql、Oracle等RDBMS。Sqoop底层用MapReduce程序实现抽取、转换、加载,MapReduce天生的特性保证了并行化和高容错率,而且相比Kettle等传统ETL工具,任务跑在Hadoop集群上,减少了ETL服务器资源的使用情况。在特定场景下,抽取过程会有很大的性能提升。
如果要用Sqoop,必须正确安装并配置Hadoop,因依赖于本地的hadoop环境启动MR程序;mysql、oracle等数据库的JDBC驱动也要放到Sqoop的lib目录下。本文针对的是Sqoop1,不涉及到Sqoop2,两者有大区别,感兴趣的读者可以看下官网说明。
二、import
import是数据从RDBMS导入到Hadoop的工具
2.1、split
Sqoop并行化是启多个map task实现的,-m(或--num-mappers)参数指定map task数,默认是四个。并行度不是设置的越大越好,map task的启动和销毁都会消耗资源,而且过多的数据库连接对数据库本身也会造成压力。在并行操作里,首先要解决输入数据是以什么方式负债均衡到多个map的,即怎么保证每个map处理的数据量大致相同且数据不重复。--split-by指定了split column,在执行并行操作时(多个map task),Sqoop需要知道以什么列split数据,其思想是:
1、先查出split column的最小值和最大值
2、然后根据map task数对(max-min)之间的数据进行均匀的范围切分
例如id作为split column,其最小值是0、最大值1000,如果设置4个map数,每个map task执行的查询语句类似于:SELECT * FROM sometable WHERE id >= lo AND id < hi,每个task里(lo,hi)的值分别是 (0, 250), (250, 500), (500, 750), and (750, 1001)。
Sqoop不能在多列字段上进行拆分,如果没有索引或者有组合键,必须显示设置splitting column;默认的主键作为split column,如果表里没有主键或者没有指定--split-by,就要设置num-mappers 1或者--autoreset-to-one-mapper,这样就只会启动一个task。
从上面的分析过程可以看到Sqoop以理想化方式根据split column将数据切分成多个范围,如果split键的值不是均匀分布,每个任务分配的数据量可能相差很大、导致数据倾斜。
2.2、参数
--driver:指定JDBC驱动,默认Mysql
--table:指定查询的表
--columns:指定从源数据库导入的列。当没有设置--table参数,就默认查询表中所有字段,实现方式是在数据库执行一个查询语句,就可得到每个字段及其对应的类型.
--where:查询条件.如果设置了table参数,就以table、columns、where三个参数拼接成的SQL查询数据
--query:自定义查询SQL,语句要有$CONDITIONS关键字,作用是动态替换,当获取默认boundary query时,$CONDITIONS会替换成(1=1);获取查询的数据列和其对应的字段类型时$CONDITIONS会替换成(1=0)。table和query不能同时设置
--boundary-query:指定split的sql,如果没有设置,且有--table参数,生成的split sql是根据table、where条件拼出来的。
如果设置了--query参数,split sql是基于query sql的子查询:
需要特别注意的是,有的数据库对子查询没有进行优化(如Mysql),查询性能会很低,这就要自定义boundary-query,提高查询效率。
2.3、HDFS
数据直接导入到HDFS,按行读取并写入到HDFS文件,源表里的每一行数据在HDFS里作为单独记录,记录可以是文本格式(每行一个记录)或Avro、SequenceFile二进制格式。导入过程可以并行,因此可能生成多文件。
--append:生成的文件追加到目标目录里
--delete-target-dir:如果目标目录已经存在,会先把目录删掉,类似overwrite
执行上面的命令后,可以看到详细的日志:输入数据是怎么split的、mapreduce执行进度、mapreduce的URL等
2.4、Hive
--hive-import参数指定数据导入到hive表:
--target-dir:需要指定该参数,数据首先写入到该目录,过程和直接导入HDFS是一样
--hive-drop-import-delims:删除string字段内的特殊字符,如果Hive使用这些字符作为分隔符,hive的字段会解析错误、出现错位的情况。它的内部是用正则表达式替换的方式把\n, \r, \01替换成""
--null-string/--null-non-string:指定空字段的值。Sqoop默认空数据存的是“NULL"字符串,但hive把空解析成\N,因此当文件存储的空是默认的"NULL"字符串,hive就不能正常读取文件中的空值了
数据import到hive表的过程:完成源数据写入到hdfs后,就执行LOAD DATA INPATH命令把target-dir里的文件LOAD到hive表:
2.5、Hbase
--hbase-table指定数据直接导入到Hbase表而不是HDFS,对于每个输入行都会转换成HBase的put操作,每行的key取自输入的列,值转换成string、UTF-8格式存储在单元格里;所有的列都在同一列簇下,不能指定多个个列簇。
然后通过Hbase shell查看表数据量、数据,
参数详细说明:
--hbase-create-table:当HTable不存在或列簇不存在,Sqoop根据HBase的默认配置自动新建表;如果没有指定该参数,就会报异常
--hbase-row-key:指定row key使用的列。默认是split-by作为row key,如果没有指定,会把源表的主键作为row key;如果row key是多个列组成的,多个列必须用逗号隔开,生成的row key是由用下划线隔开(`ID`_`RUN_ID`)的字段组合
--column-family:指定列簇名,所有的输出列都在同一列簇下
三、export
export是HDFS里的文件导出到RDBMS的工具,不能从hive、hbase导出数据,且HDFS文件只能是文本格式。如果要把hive表数据导出到RDBMS,可以先把hive表通过查询写入到一个临时表,临时用文本格式,然后再从该临时表目录里export数据。
3.1、task数
Sqoop从HDFS目录里读取文件,所以启动的map task数依赖于-m参数、文件大小、文件数量、块大小等,可以参考这篇文章
3.2、插入/更新
默认情况下Sqoop在数据导出到RDBMS时,每行记录都转换成数据库的INSERT语句,但也支持插入/更新模式,即根据一定规则判断,如果是新记录用INSERT语句,否则就UPDATE,设置--update-mode allowinsert参数启用该功能,插入/更新操作依赖于目标数据库。
对于Mysql,使用INSERT INTO … ON DUPLICATE KEY UPDATE语法,用户不能指定列来判断是插入还是更新,而是依赖于表的唯一约束,Mysql在插入数据时,如果是因唯一约束引起的错误,就更新数据行。Sqoop会忽略--update-key参数,但要至少指定一个有效列,才能启用更新模式
3.3、参数
--columns:指定插入目标数据库的字段,sqoop直接读取hdfs文件并把记录解析成多个字段,此时解析后的记录是没有字段名的,是通过位置和columns列表对应的,数据库插入的sql类似于:insert into _table (c1,c2...) value(v1,v2...)
--export-dir:指定HDFS输入文件的目录
--input-fields-terminated-by:字段之间分隔符
--input-lines-terminated-by:行分隔符
四、问题及优化
4.1、Hive不支持的数据类型
关系型数据库的字段类型和hive的字段类型还是有差别的,所以Sqoop有一个映射关系,把RDBMS中的类型映射到Hive类型。在create hive表时,会根据RDBMS类型和hive类型进行映射、进而设置hive表字段类型,如果没有匹配到,就会报异常,如VARBINARY:
其解决方案有三种:
1、在--query参数内显示对字段进行转换,如VARBINARY转换成VARCHAR,而Sqoop会默认的把VARCHAR转换成Hive的STRING.
2、增加--map-column-hive参数,显示把字段映射到Hive指定的类型
3、修改HiveTypes类,使Sqoop支持对特定类型(如:VARBINARY)的映射,这种方案相对以上两种可以一劳永逸,但要重编译sqoop源码
类型映射逻辑如下:
4.2、Java不支持的类型
Sqoop创建ORM对象,数据库中的字段映射到Java属性,用于读取数据库ResultSet对象并解析字段,需要把数据库的类型映射到java类型,如果没有映射到,就会报错。解决方案也有三种:
1、在--query参数内显示对字段进行转换
2、设置--map-column-java参数
3、修改ConnManager类
映射逻辑代码:
4.3、特殊字符
当\t特殊字符导入到hive后,hive字段可能解析出错。解决方法是修改FieldFormatter类,使Sqoop可以删除或替换掉字段数据中包含\t的特殊字符:
4.4、字段错位
使用--query和--columns参数时,如果columns设置的列顺序和query列顺序不同,会有个疑惑是import后的字段和实际字段的值不一样,这是因为从数据库查询的ResultSet对象序列化到实体对象时,column的值是根据索引取的。
例如readInteger的代码:
如果要改为columns的字段值是根据字段名取而不是根据索引位置取,可以更改一下几个地方的代码:ClassWriter、JdbcWritableBridge
本文首发于公众号:data之道