Import RDBMS Data Using Sqoop

Sqoop is often used to load data from RDBMS into Hadoop. Following are some important commands. We are going to use mysql as example of RDBMS source for illustration. You can follow similar technique for importing data from Oracle, SQL Server, TeraData etc.

Common use cases of sqoop

  • Import / export data from RDBMS databases that support jdbc driver to HDFS
  • While importing data, create hive schema as well
  • Specify data format at the HDFS - avro, parquet
  • Enabled compression and specify compression codec
  • Sqoop job allows to create an alias for a sqoop command
  • Increatemental load (merge model and append mode)
  • Importing directly into hbase
  • While importing data in plain text, specify field delimiter or record delimiter

First, check connectivity with mysql

$ mysql -u root -p
<enter password, cloudera, when prompted>

In the database, there is a mysql database called, retail_db already present if you are using Cloudera quickstart VM.

mysql> use retail_db


Show database:

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| retail_db          |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
5 rows in set (0.01 sec)



Show the tables.

mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories          |
| customers           |
| departments         |
| order_items         |
| orders              |
| products            |
+---------------------+

Now that you have verified the database, open another terminal and test mysql connectivity. If you want using mysql that is hosted somewhere else, use server name or server IP address.

$ sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password cloudera

You should be able to see retail_db. Now list the tables in the retail_db database.

$ sqoop list-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera 

Importing Table

Create a directory in HDFS to stage the tables during import

$ hadoop fs -mkdir -p /staging/mysql/retail_db

Import customer tables from mysql to the above location.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table customers --target-dir '/staging/mysql/retail_db/customers' -m 1

Verify the data in HDFS

$ hadoop fs -text /staging/mysql/retail_db/customers/part-m-00000 | head
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725

Importing All Tables

$ sqoop import-all-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --warehouse-dir '/staging/mysql/retail_db' -m 1

See the folder structure.

$ hadoop fs -ls -R /staging/mysql/retail_db/
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/categories
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/categories/_SUCCESS
-rw-r--r--   1 cloudera supergroup       1029 2016-08-30 04:09 /staging/mysql/retail_db/categories/part-m-00000
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/customers
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/customers/_SUCCESS
-rw-r--r--   1 cloudera supergroup     953525 2016-08-30 04:09 /staging/mysql/retail_db/customers/part-m-00000
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/departments
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:09 /staging/mysql/retail_db/departments/_SUCCESS
-rw-r--r--   1 cloudera supergroup         60 2016-08-30 04:09 /staging/mysql/retail_db/departments/part-m-00000
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/order_items
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/order_items/_SUCCESS
-rw-r--r--   1 cloudera supergroup    5408880 2016-08-30 04:10 /staging/mysql/retail_db/order_items/part-m-00000
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/orders
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/orders/_SUCCESS
-rw-r--r--   1 cloudera supergroup    2999944 2016-08-30 04:10 /staging/mysql/retail_db/orders/part-m-00000
drwxr-xr-x   - cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/products
-rw-r--r--   1 cloudera supergroup          0 2016-08-30 04:10 /staging/mysql/retail_db/products/_SUCCESS
-rw-r--r--   1 cloudera supergroup     173993 2016-08-30 04:10 /staging/mysql/retail_db/products/part-m-00000

Incremental Load and Building a Sqoop Job

Create a sample deal table and insert a couple of sample rows.

mysql> DROP TABLE IF EXISTS employee;
mysql> CREATE TABLE employee(
emp_id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
age INT,
delete_flag TINYINT(1),
LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (emp_id)
);
mysql> INSERT INTO employee (name, age) VALUES 
("Employee 1", 34), 
("Employee 2", 40); 
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name       | age  | delete_flag | LastModifiedDate    |
+--------+------------+------+-------------+---------------------+
|      1 | Employee 1 |   34 |        NULL | 2016-08-30 04:25:53 |
|      2 | Employee 2 |   40 |        NULL | 2016-08-30 04:25:53 |
+--------+------------+------+-------------+---------------------+

Import employee table into HDFS and verify.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1
$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,34,null,2016-08-30 04:25:53.0
2,Employee 2,40,null,2016-08-30 04:25:53.0

Incremental Load has two options

  1. merge mode: old and new data are merged based on --merge-key
  2. append mode: old data remains as separate file. You need a separate logic layer (Hive is a popular choice) to get the latest version of a record.

Incremental Load with Merge-mode

In mysql, insert 2 more rows and update an existing rows. Verify data reflects the update and timestamps are updated too.

mysql> INSERT INTO employee (name, age) values ("Employee 3", 40), ("Employee 4", 42);
mysql> UPDATE employee SET age=31 WHERE emp_id = 1;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name       | age  | delete_flag | LastModifiedDate    |
+--------+------------+------+-------------+---------------------+
|      1 | Employee 1 |   31 |        NULL | 2016-08-30 04:48:34 |
|      2 | Employee 2 |   40 |        NULL | 2016-08-30 04:25:53 |
|      3 | Employee 3 |   40 |        NULL | 2016-08-30 04:48:29 |
|      4 | Employee 4 |   42 |        NULL | 2016-08-30 04:48:29 |
+--------+------------+------+-------------+---------------------+

Do an incremental load using sqoop. You can optionally put a "--where" clause.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --last-value '2016-08-30 04:25:54' --merge-key emp_id

Verify the data in HDFS. Because, you merge_key, old and new dataset are merge into a single set and it reflects the latest RDBMS table state. If you delete a row from RDBMS table, the row will remain in HDFS.

$ hadoop fs -text /staging/mysql/retail_db/employee/part*

1,Employee 1,31,null,2016-08-30 04:48:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 04:48:29.0
4,Employee 4,42,null,2016-08-30 04:48:29.0

Incremental Load with append-mode

In mysql, insert 2 more rows and update an existing rows. Verify data reflects the update and timestamps are updated too.

mysql> INSERT INTO employee (name, age) values ("Employee 5", 40), ("Employee 6", 42);
mysql> UPDATE employee SET age=32 WHERE emp_id = 1;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name       | age  | delete_flag | LastModifiedDate    |
+--------+------------+------+-------------+---------------------+
|      1 | Employee 1 |   32 |        NULL | 2016-08-30 05:17:34 |
|      2 | Employee 2 |   40 |        NULL | 2016-08-30 04:25:53 |
|      3 | Employee 3 |   40 |        NULL | 2016-08-30 04:48:29 |
|      4 | Employee 4 |   42 |        NULL | 2016-08-30 04:48:29 |
|      5 | Employee 5 |   40 |        NULL | 2016-08-30 05:17:28 |
|      6 | Employee 6 |   42 |        NULL | 2016-08-30 05:17:28 |
+--------+------------+------+-------------+---------------------+

Import using sqoop .... notice the --append mode.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --last-value '2016-08-30 04:53:33.0' --append

View the folder structure.

$ hadoop fs -ls -R /staging/mysql/retail_db/employee
-rw-r--r--   1 cloudera cloudera          0 2016-08-30 04:54 /staging/mysql/retail_db/employee/_SUCCESS
-rw-r--r--   1 cloudera cloudera        129 2016-08-30 05:20 /staging/mysql/retail_db/employee/part-m-00001
-rw-r--r--   1 cloudera cloudera        164 2016-08-30 04:54 /staging/mysql/retail_db/employee/part-r-00000

View the content of the files.

$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0
1,Employee 1,31,null,2016-08-30 04:48:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 04:48:29.0
4,Employee 4,42,null,2016-08-30 04:48:29.0

Sqoop Job for Incremental Load

Passing last-value in the sqoop import statement was manual, hence error prone. We can automate this by creating sqoop job.

$ sqoop job --create import-employee -- import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table employee --target-dir '/staging/mysql/retail_db/employee' -m 1 --incremental lastmodified --check-column LastModifiedDate --append

Note: Sqoop allows to save password if sqoop.metastore.client.record.password to true in configuration file /etc/sqoop/conf/sqoop-site.xml.

View the job details

$ sqoop job --list
$ sqoop job --show import-employee

Delete existing HDFS location.

$ hadoop fs -rm -r /staging/mysql/retail_db/employee

Run the sqoop job ... first load.

$ sqoop job --exec import-employee

Verify HDFS

$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 05:50:18.0
4,Employee 4,42,null,2016-08-30 05:50:23.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0

Now, make a few changes in mysql table and run the sqoop job again. They should appear in HDFS

mysql> INSERT INTO employee (name, age) values ("Employee 7", 45), ("Employee 8", 46);
mysql> UPDATE employee SET age=32 WHERE emp_id = 4;
mysql> SELECT * FROM employee;
+--------+------------+------+-------------+---------------------+
| emp_id | name       | age  | delete_flag | LastModifiedDate    |
+--------+------------+------+-------------+---------------------+
|      1 | Employee 1 |   32 |        NULL | 2016-08-30 05:17:34 |
|      2 | Employee 2 |   40 |        NULL | 2016-08-30 04:25:53 |
|      3 | Employee 3 |   40 |        NULL | 2016-08-30 05:50:18 |
|      4 | Employee 4 |   32 |        NULL | 2016-08-30 05:55:54 |
|      5 | Employee 5 |   40 |        NULL | 2016-08-30 05:17:28 |
|      6 | Employee 6 |   42 |        NULL | 2016-08-30 05:17:28 |
|      7 | Employee 7 |   45 |        NULL | 2016-08-30 05:55:48 |
|      8 | Employee 8 |   46 |        NULL | 2016-08-30 05:55:48 |
+--------+------------+------+-------------+---------------------+

Run sqoop job to import the date

$ sqoop job --exec import-employee

Verify the changes are present.

$ hadoop fs -text /staging/mysql/retail_db/employee/part*
1,Employee 1,32,null,2016-08-30 05:17:34.0
2,Employee 2,40,null,2016-08-30 04:25:53.0
3,Employee 3,40,null,2016-08-30 05:50:18.0
4,Employee 4,42,null,2016-08-30 05:50:23.0
5,Employee 5,40,null,2016-08-30 05:17:28.0
6,Employee 6,42,null,2016-08-30 05:17:28.0
4,Employee 4,32,null,2016-08-30 05:55:54.0
7,Employee 7,45,null,2016-08-30 05:55:48.0
8,Employee 8,46,null,2016-08-30 05:55:48.0
create external table employee(
   id string,
   name string,
   age int,
   deleted boolean,
   lmd string
)
row format delimited
fields terminated by ','
stored as textfile
location '/staging/mysql/retail_db/employee';
create table employee_parquet like employee stored as parquet;
insert into employee_parquet
   select id, name, age, deleted, lmd from
       (select
           *,
           row_number() over (partition by id order by lmd desc) r
       from employee) t
    where r = 1;

Delete Operation in RDBMS

While OLTP is efficient for all CRUD operations, data in HDFS is immutable, at best you can append to an existing file or recreate the file with updates. Recreating the file in HDFS can be time consuming and costly. While we have seen, the updates are maintained as versioned data, delete operations are typically handling by using delete flag.

Using Option File in Sqoop Command

You can shave the sqoop import statement in a file execute the sqoop command based on file.

Create a file, name it option.txt ... you can give any arbitrary name.

# sqoop command
import
--connect
jdbc:mysql://localhost:3306/retail_db
# Username
--username
root
# Password
--password
cloudera
# Name of the table
--table
employee
# Target directory where imported files will be stored
--target-dir
/staging/mysql/retail_db/employee
# Number of parallel thread for importing
-m
1 

Run the sqoop statement using the option.txt

$ sqoop --options-file import.txt

If you want to control the resource (v-cores or RAM) for each mapper or reducer tasks, you can pass the configuration as below.

$ sqoop import -D mapreduce.map.memory.mb=4096 -D mapreduce.map.java.opts=-Xmx3000m ....