Import RDBMS Data Using Sqoop
Sqoop is often used to load data from RDBMS into Hadoop. Following are some important commands. We are going to use mysql as example of RDBMS source for illustration. You can follow similar technique for importing data from Oracle, SQL Server, TeraData etc.
Common use cases of sqoop
Import / export data from RDBMS databases that support jdbc driver to HDFS
While importing data, create hive schema as well
Specify data format at the HDFS - avro, parquet
Enabled compression and specify compression codec
Sqoop job allows to create an alias for a sqoop command
Increatemental load (merge model and append mode)
Importing directly into hbase
While importing data in plain text, specify field delimiter or record delimiter
First, check connectivity with mysql
$ mysql -u root -p
<enter password, cloudera, when prompted>
In the database, there is a mysql database called, retail_db already present if you are using Cloudera quickstart VM.
mysql> use retail_db
Show database:
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| retail_db |
| mysql |
| performance_schema |
| sys |
+--------------------+
5 rows in set (0.01 sec)
Show the tables.
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
+---------------------+
Now that you have verified the database, open another terminal and test mysql connectivity. If you want using mysql that is hosted somewhere else, use server name or server IP address.
$ sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password cloudera
You should be able to see retail_db. Now list the tables in the retail_db database.
$ sqoop list-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera
Importing Table
Create a directory in HDFS to stage the tables during import
$ hadoop fs -mkdir -p /staging/mysql/retail_db
Import customer tables from mysql to the above location.
$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table customers --target-dir '/staging/mysql/retail_db/customers' -m 1
Verify the data in HDFS
$ hadoop fs -text /staging/mysql/retail_db/customers/part-m-00000 | head
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725
Importing All Tables
$ sqoop import-all-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --warehouse-dir '/staging/mysql/retail_db' -m 1
See the folder structure.
$ hadoop fs -ls -R /staging/mysql/retail_db/
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/categories
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/categories/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 1029 2016-08-30 04:09 /staging/mysql/retail_db/categories/part-m-00000
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/customers
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/customers/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 953525 2016-08-30 04:09 /staging/mysql/retail_db/customers/part-m-00000
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/departments
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:09 /staging/mysql/retail_db/departments/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 60 2016-08-30 04:09 /staging/mysql/retail_db/departments/part-m-00000
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/order_items
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/order_items/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 5408880 2016-08-30 04:10 /staging/mysql/retail_db/order_items/part-m-00000
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/orders
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/orders/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 2999944 2016-08-30 04:10 /staging/mysql/retail_db/orders/part-m-00000
drwxr-xr-x - cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/products
-rw-r--r-- 1 cloudera supergroup 0 2016-08-30 04:10 /staging/mysql/retail_db/products/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 173993 2016-08-30 04:10 /staging/mysql/retail_db/products/part-m-00000
Incremental Load and Building a Sqoop Job
Create a sample deal table and insert a couple of sample rows.
mysql> DROP TABLE IF EXISTS employee;
mysql> CREATE TABLE employee(
emp_id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
age INT,
delete_flag TINYINT(1),
LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (emp_id)
);
mysql> INSERT INTO employee (name, age) VALUES
("Employee 1", 34),
("Employee 2", 40);
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name | age | delete_flag | LastModifiedDate |
+--------+------------+------+-------------+---------------------+
| 1 | Employee 1 | 34 | NULL | 2016-08-30 04:25:53 |
| 2 | Employee 2 | 40 | NULL | 2016-08-30 04:25:53 |
+--------+------------+------+-------------+---------------------+
Import employee table into HDFS and verify.
$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,34,null,2016-08-30 04:25:53.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
Incremental Load has two options
merge mode: old and new data are merged based on --merge-key
append mode: old data remains as separate file. You need a separate logic layer (Hive is a popular choice) to get the latest version of a record.
Incremental Load with Merge-mode
In mysql, insert 2 more rows and update an existing rows. Verify data reflects the update and timestamps are updated too.
mysql> INSERT INTO employee (name, age) values ("Employee 3", 40), ("Employee 4", 42);
mysql> UPDATE employee SET age=31 WHERE emp_id = 1;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name | age | delete_flag | LastModifiedDate |
+--------+------------+------+-------------+---------------------+
| 1 | Employee 1 | 31 | NULL | 2016-08-30 04:48:34 |
| 2 | Employee 2 | 40 | NULL | 2016-08-30 04:25:53 |
| 3 | Employee 3 | 40 | NULL | 2016-08-30 04:48:29 |
| 4 | Employee 4 | 42 | NULL | 2016-08-30 04:48:29 |
+--------+------------+------+-------------+---------------------+
Do an incremental load using sqoop. You can optionally put a "--where" clause.
$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --last-value '2016-08-30 04:25:54' --merge-key emp_id
Verify the data in HDFS. Because, you merge_key, old and new dataset are merge into a single set and it reflects the latest RDBMS table state. If you delete a row from RDBMS table, the row will remain in HDFS.
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,31,null,2016-08-30 04:48:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 04:48:29.0
4,Employee 4,42,null,2016-08-30 04:48:29.0
Incremental Load with append-mode
In mysql, insert 2 more rows and update an existing rows. Verify data reflects the update and timestamps are updated too.
mysql> INSERT INTO employee (name, age) values ("Employee 5", 40), ("Employee 6", 42);
mysql> UPDATE employee SET age=32 WHERE emp_id = 1;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name | age | delete_flag | LastModifiedDate |
+--------+------------+------+-------------+---------------------+
| 1 | Employee 1 | 32 | NULL | 2016-08-30 05:17:34 |
| 2 | Employee 2 | 40 | NULL | 2016-08-30 04:25:53 |
| 3 | Employee 3 | 40 | NULL | 2016-08-30 04:48:29 |
| 4 | Employee 4 | 42 | NULL | 2016-08-30 04:48:29 |
| 5 | Employee 5 | 40 | NULL | 2016-08-30 05:17:28 |
| 6 | Employee 6 | 42 | NULL | 2016-08-30 05:17:28 |
+--------+------------+------+-------------+---------------------+
Import using sqoop .... notice the --append mode.
$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --last-value '2016-08-30 04:53:33.0' --append
View the folder structure.
$ hadoop fs -ls -R /staging/mysql/retail_db/employee
-rw-r--r-- 1 cloudera cloudera 0 2016-08-30 04:54 /staging/mysql/retail_db/employee/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 129 2016-08-30 05:20 /staging/mysql/retail_db/employee/part-m-00001
-rw-r--r-- 1 cloudera cloudera 164 2016-08-30 04:54 /staging/mysql/retail_db/employee/part-r-00000
View the content of the files.
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0
1,Employee 1,31,null,2016-08-30 04:48:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 04:48:29.0
4,Employee 4,42,null,2016-08-30 04:48:29.0
Sqoop Job for Incremental Load
Passing last-value in the sqoop import statement was manual, hence error prone. We can automate this by creating sqoop job.
$ sqoop job --create import-employee -- import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --append
Note: Sqoop allows to save password if sqoop.metastore.client.record.password to true in configuration file /etc/sqoop/conf/sqoop-site.xml.
View the job details
$ sqoop job --list
$ sqoop job --show import-employee
Delete existing HDFS location.
$ hadoop fs -rm -r /staging/mysql/retail_db/employee
Run the sqoop job ... first load.
$ sqoop job --exec import-employee
Verify HDFS
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 05:50:18.0
4,Employee 4,42,null,2016-08-30 05:50:23.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0
Now, make a few changes in mysql table and run the sqoop job again. They should appear in HDFS
mysql> INSERT INTO employee (name, age) values ("Employee 7", 45), ("Employee 8", 46);
mysql> UPDATE employee SET age=32 WHERE emp_id = 4;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name | age | delete_flag | LastModifiedDate |
+--------+------------+------+-------------+---------------------+
| 1 | Employee 1 | 32 | NULL | 2016-08-30 05:17:34 |
| 2 | Employee 2 | 40 | NULL | 2016-08-30 04:25:53 |
| 3 | Employee 3 | 40 | NULL | 2016-08-30 05:50:18 |
| 4 | Employee 4 | 32 | NULL | 2016-08-30 05:55:54 |
| 5 | Employee 5 | 40 | NULL | 2016-08-30 05:17:28 |
| 6 | Employee 6 | 42 | NULL | 2016-08-30 05:17:28 |
| 7 | Employee 7 | 45 | NULL | 2016-08-30 05:55:48 |
| 8 | Employee 8 | 46 | NULL | 2016-08-30 05:55:48 |
+--------+------------+------+-------------+---------------------+
Run sqoop job to import the date
$ sqoop job --exec import-employee
Verify the changes are present.
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 05:50:18.0
4,Employee 4,42,null,2016-08-30 05:50:23.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0
4,Employee 4,32,null,2016-08-30 05:55:54.0
7,Employee 7,45,null,2016-08-30 05:55:48.0
8,Employee 8,46,null,2016-08-30 05:55:48.0
create external table employee(
id string,
name string,
age int,
deleted boolean,
lmd string
)
row format delimited
fields terminated by ','
stored as textfile
location '/staging/mysql/retail_db/employee';
create table employee_parquet like employee stored as parquet;
insert into employee_parquet
select id, name, age, deleted, lmd from
(select
*,
row_number() over (partition by id order by lmd desc) r
from employee) t
where r = 1;
Delete Operation in RDBMS
While OLTP is efficient for all CRUD operations, data in HDFS is immutable, at best you can append to an existing file or recreate the file with updates. Recreating the file in HDFS can be time consuming and costly. While we have seen, the updates are maintained as versioned data, delete operations are typically handling by using delete flag.
Using Option File in Sqoop Command
You can shave the sqoop import statement in a file execute the sqoop command based on file.
Create a file, name it option.txt ... you can give any arbitrary name.
# sqoop command
import
--connect
jdbc:mysql://localhost:3306/retail_db
# Username
--username
root
# Password
--password
cloudera
# Name of the table
--table
employee
# Target directory where imported files will be stored
--target-dir
/staging/mysql/retail_db/employee
# Number of parallel thread for importing
-m
1
Run the sqoop statement using the option.txt
$ sqoop --options-file import.txt
If you want to control the resource (v-cores or RAM) for each mapper or reducer tasks, you can pass the configuration as below.
$ sqoop import -D mapreduce.map.memory.mb=4096 -D mapreduce.map.java.opts=-Xmx3000m ....