1. 首页
  2. 大数据
  3. Sqoop教程

【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS

本文讲解Sqoop Import命令的详细使用方式,讲解如何通过Import命令将RDBMS中的数据导入到HDFS中,以8个案例让读者迅速掌握Import命令的使用,重点是掌握如何指定导入条件(如指定表、列、分隔符、输出目录等)以及增量导入。

import参数概览


[root@master bin]# ./sqoop import --help Common arguments: --connect <jdbc-uri> 指定JDBC的URL --password <password> 数据库用户名 --username <username> 数据库密码 Import control arguments: --append 以追加的方式导入 --as-parquetfile 把表导出成parquet文件 --as-sequencefile 把表导出成SequenceFile文件 --as-textfile 把表导出成text文件(默认) --columns <col,col,col...> 导入的时候指定列 --compression-codec <codec> 指定压缩格式 --delete-target-dir 导入的时候删除目标目录 --direct 以direct方式导入速度更快 -e,--query <statement> 以sql查询语句方式导入 -m,--num-mappers <n> map任务的并行度 --split-by <column-name> 指定切分的列(一般是并行度大于1时) --table <table-name> 指定要导入的表 --target-dir <dir> 导入到HDFS的目标目录 --where <where clause> 使用过滤条件 -z,--compress 启用压缩 Incremental import arguments: --check-column <column> 以哪一列作为增量导入的依据,和RDBMS中的自增字段及时间戳类似 --incremental <import-type> 说明增量导入有两种模式 append和lastmodified --last-value <value> 指定上一次导入中检查列指定字段最大值 Output line formatting arguments: --fields-terminated-by <char> 指定导出文件的分隔符 --lines-terminated-by <char> 指定导出文件的行分隔符

参数非常多,大概总结一下就是:首先必须要有connect username password,毕竟你是要连接数据库的。导入的时候可以指定table,可以指定column,可以指定过滤条件、目录、并行度、分隔符等。还可以压缩导入、增量导入等等。下面具体讲解,蛮简单的,跟着代码敲一遍自己感受下就学会了。

案例1:指定Table+column

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive --username root --password 123456 --table TBLS  --columns "TBL_NAME"
18/05/04 21:07:31 INFO mapreduce.ImportJobBase: Beginning import of TBLS
18/05/04 21:07:31 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
18/05/04 21:07:35 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
18/05/04 21:07:47 INFO db.IntegerSplitter: Split size: 8; Num splits: 4 from: 41 to: 76
18/05/04 21:07:47 INFO mapreduce.JobSubmitter: number of splits:4
18/05/04 21:07:49 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1525436279077_0001/
18/05/04 21:07:49 INFO mapreduce.Job: Running job: job_1525436279077_0001
18/05/04 21:08:10 INFO mapreduce.Job: Job job_1525436279077_0001 running in uber mode : false
18/05/04 21:08:10 INFO mapreduce.Job:  map 0% reduce 0%
18/05/04 21:08:28 INFO mapreduce.Job:  map 25% reduce 0%
18/05/04 21:08:35 INFO mapreduce.Job:  map 50% reduce 0%
18/05/04 21:08:37 INFO mapreduce.Job:  map 75% reduce 0%
18/05/04 21:08:38 INFO mapreduce.Job:  map 100% reduce 0%
18/05/04 21:08:39 INFO mapreduce.Job: Job job_1525436279077_0001 completed successfully
18/05/04 21:08:39 INFO mapreduce.ImportJobBase: Retrieved 4 records.

我拷贝下来了部分重要的日志信息,通过日志信息我们可以知道:

  • Sqoop任务是转为了MapReduce的map任务,之前讲架构的时候也说过,sqoop底层就是用map。
    【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS
  • Sqoop map任务个数:“Num splits: 4”。4是默认的,我们其实是可以设置的,通过-m这个参数。
    【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS
    至于为什么是在/user/root/TBLS目录下呢?这也是默认的,我们也是可以设置的,通过–target-dir

案例2:指定过滤条件、分区、目标路径

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive --username root --password 123456 --table TBLS  --where "TBL_ID>70" -m 1 --target-dir /user/sqoop/import/0504

可以看到我们-m 1生效了 只有1个map任务了
【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS
过滤出了id大于70的记录

[root@master bin]# hadoop fs -cat /user/sqoop/import/0504/part-m-00000
71,1518778031,11,0,root,0,81,weblogs,EXTERNAL_TABLE,null,null
76,1519129543,11,0,root,0,86,emp,MANAGED_TABLE,null,null

案例3:通过select查询导入

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive --username root --password  123456 --target-dir /user/sqoop/import/050401 --query 'select * from TBLS where TBL_ID >70 order by id'
When importing query results in parallel, you must specify --split-by.

报错了!这里是需要注意的地方,当使用–query的时候 若并行度大于1必须指定split-by。

解释sqoop是如何根据–split-by进行分区的 比如-m 10,首先sqoop会向关系型数据库比如mysql发送一个命令:select max(id),min(id) from test。然后会把max、min之间的区间平均分为10分,最后10个并行的map去找数据库。–split-by对非数字类型的字段支持不好。一般用于主键及数字类型的字段。

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive --username root --password  123456 --target-dir /user/sqoop/import/050401 --query 'select * from TBLS where TBL_ID >70 order by id' --split-by TBL_ID
18/05/04 21:58:10 ERROR tool.ImportTool: Import failed: java.io.IOException: Query [select * from TBLS where TBL_ID >70 order by id] must contain '$CONDITIONS' in WHERE clause.

又报错了,根据提示信息我们可以知道,where子句中必须使用$CONDITIONS,另外需要注意,双引号$CONDITIONS要有转义,单引号不需要。

正确语句:

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive --username root --password  123456 --target-dir /user/sqoop/import/050401 --query 'select * from TBLS where TBL_ID >70 and $CONDITIONS order by TBL_ID' --split-by TBL_ID

案例4:输出压缩

–compress代表输出文件需要压缩 –compression-codec指定压缩格式 比如有Snappy,lzo,gzip,bzip2

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive \
> --username root --password 123456  --table TBLS \
> --target-dir /user/sqoop/import/050402 \
> --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec

【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS
把SnappyCodec改为GzipCodec:
【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS

案例5:指定分隔符

–fields-terminated-by指定输出文件的分隔符,在mysql中是制表符分割的,而输出文件我们可以自定义,把制表符换成其他字符。

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive \
> --username root --password 123456  --table TBLS \
> --target-dir /user/sqoop/import/050403 --fields-terminated-by ':' \
[root@master bin]# hadoop fs -cat /user/sqoop/import/050403/part-m-00000
41:1517148697:1:0:root:0:51:departments:MANAGED_TABLE:null:null

案例6:导入时若目标文件存在则删除目标文件

我们经常会碰见,由于目标文件已经存在,而报错的情况。

18/05/04 22:26:04 ERROR tool.ImportTool: Import failed: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://mycluster/user/sqoop/import/050405 already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)

如果希望杜绝这种情况,干脆直接–delete-target-dir,太简单就不做演示了。

案例7:增量导入

增量导入在企业中用的非常广泛,比如我每天都需要进行RDBMS–>HDFS的互导,总不能每次都全部导入吧,假设昨天导入了1000条,今天肯定是要导入1000之后的数据。那么就需要用到增量导入。

前面我们执行help命令的时候,就有增量导入相关参数:Incremental import arguments,回顾下,主要是三个字段,check-column,incremental,last-value。

  • append模式
    在数据库的表字段中常常会设置一个自增的字段来作为数据表的主键,比如id。那么我们就设置check-column为id,作为增量导入的依据.last-value设置上次导入的id的最大值,如果不指定last-value值,将会将表的所有数据进行导入,便发生了数据的冗余。
[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/hive \
> --username root \
> --password 123456 \
> --table TBLS \
> --target-dir /user/sqoop/import/050401 \
> -m 1 \
> --check-column TBL_ID \
> --incremental append \
> --last-value 71

之前我们已经在050401中导入过数据了,现在的是增量数据,即id为71以后的数据(不包含71哦)。
【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS

  • lastmodified模式
    上面讲的append模式可以自己指定一个字段。而lastmodified模式是以时间戳为依据的,使用lastmodified模式时要指定是append还是merge-key,merge-key这种模式是进行了一次完整的mapreduce操作。
    创建表的时候,指定时间戳列:
mysql> create table stu(id int,name varchar(20),last_mod timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);
Query OK, 0 rows affected (0.01 sec)

mysql> insert into stu(id,name) values(1,'lyl');
Query OK, 1 row affected (0.00 sec)

mysql> insert into stu(id,name) values(2,'lyl');
Query OK, 1 row affected (0.00 sec)

mysql> insert into stu(id,name) values(3,'lyl');
Query OK, 1 row affected (0.00 sec)

mysql> select * from stu;
+------+------+---------------------+
| id   | name | last_mod            |
+------+------+---------------------+
|    1 | lyl  | 2018-05-04 22:50:37 |
|    2 | lyl  | 2018-05-04 22:50:39 |
|    3 | lyl  | 2018-05-04 22:50:43 |
+------+------+---------------------+
3 rows in set (0.00 sec)

先进行一次全量导入:

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/weblog --username root --password 123456 --table stu --target-dir /user/sqoop/import/050406 -m 1

查看文件内容:

[root@master bin]# hadoop fs -cat /user/sqoop/import/050406/part-m-00000
1,lyl,2018-05-04 22:50:37.0
2,lyl,2018-05-04 22:50:39.0
3,lyl,2018-05-04 22:50:43.0

往mysql插入一条新数据

mysql> insert into stu(id,name) values(4,'lyl');
Query OK, 1 row affected (0.00 sec)

再进行增量导入:

./sqoop import --connect jdbc:mysql://master:3306/weblog \
--username root --password 123456 --table stu \
--check-column last_mod \
--incremental lastmodified \
--last-value "2018-05-04 22:50:43" \
-m 1 \
--target-dir /user/sqoop/import/050406 \
--append

查看文件内容

[root@master bin]# hadoop fs -cat /user/sqoop/import/050406/part-m-00001
3,lyl,2018-05-04 22:50:43.0
4,lyl,2018-05-04 22:59:38.0

怎么又导入了一条id为3的数据,注意一个问题!会将大于等于last-value的值导入!
【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS

使用lastmodified模式之–merge-key

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/weblog --username root --password 123456 --table stu --check-column last_mod --incremental lastmodified --last-value "2018-05-04 22:59:38" -m 1 --target-dir /user/sqoop/import/050406 --merge-key id

–merge-key是一次完整的mapreduce操作,查看Web ui可以发现只有一个文件,而之前append模式是有两个文件的哦!
【Sqoop教程】(三)Sqoop import详解之RDBMS数据导入HDFS
而且不会出现上面重复的问题

[root@master bin]# hadoop fs -cat /user/sqoop/import/050406/part-r-00000
1,lyl,2018-05-04 23:05:53.0
2,lyl,2018-05-04 22:50:39.0
3,lyl,2018-05-04 22:50:43.0
4,lyl,2018-05-04 22:59:38.0
5,lyl,2018-05-04 23:03:43.0
6,lyl,2018-05-04 23:03:45.0

注意,假如不是插入操作,而是更新操作,last_mod时间戳也是会变化的哦,增量导入的时候,如果是append模式 这个也会重复导入,而–merge-key模式则不会。

案例8:指定输出文件格式

如果不指定默认是textFile。

[root@master bin]# ./sqoop import --connect jdbc:mysql://master:3306/weblog \
--username root --password 123456 --table stu \
--target-dir /user/sqoop/import/050505 \
-m 1 \
--as-parquetfile

查看WEB UI可以看到输出文件为:315a204a-9a48-4ef5-aed6-196f60201a1a.parquet 是parquet格式了

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/949

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code