Migrate data from RDBMS

Goal: migrate data from mysql to cassandra.

Steps:

1. Load data into mysql

2. Create schema for the tables in Cassandra

3. Launch spark with Cassandra and mysql support

4. Using code, pull data from mysql tables and push to cassandra tables.

A test database has been loaded to mysql as employees table as a database - employees.

Login to mysql to test the data.

[training@localhost test_db]$ mysql -u root -p

Enter password:

Welcome to the MariaDB monitor. Commands end with ; or \g.

Your MariaDB connection id is 16

Server version: 10.0.31-MariaDB MariaDB Server

Copyright (c) 2000, 2017, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]>

MariaDB [(none)]>

MariaDB [(none)]>

MariaDB [(none)]> show databases;

+--------------------+

| Database |

+--------------------+

| employees |

| information_schema |

| mysql |

| performance_schema |

| test |

+--------------------+

5 rows in set (0.00 sec)

MariaDB [(none)]> use employees;

Reading table information for completion of table and column names

You can turn off this feature to get a quicker startup with -A

Database changed

MariaDB [employees]> show tables;

+----------------------+

| Tables_in_employees |

+----------------------+

| current_dept_emp |

| departments |

| dept_emp |

| dept_emp_latest_date |

| dept_manager |

| employees |

| salaries |

| titles |

+----------------------+

8 rows in set (0.00 sec)

MariaDB [employees]> select count(*) from employees;

+----------+

| count(*) |

+----------+

| 300024 |

+----------+

1 row in set (0.09 sec)

MariaDB [employees]> select count(*) from salaries;

+----------+

| count(*) |

+----------+

| 2844047 |

+----------+

1 row in set (0.47 sec)

View schema of employees table. This will help create a schema of a table in Cassandra. The output is slightly modified.

MariaDB [employees]> show create table employees;

CREATE TABLE `employees` (

`emp_no` int(11) NOT NULL,

`birth_date` date NOT NULL,

`first_name` varchar(14) NOT NULL,

`last_name` varchar(16) NOT NULL,

`gender` enum('M','F') NOT NULL,

`hire_date` date NOT NULL,

PRIMARY KEY (`emp_no`)

)

Create a new keyspace employees

cqlsh> create keyspace employees WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

cqlsh> use employees ;

Create a matching table in Cassandra for each in test_db in mysql

cqlsh:employees> CREATE TABLE employees (

emp_no int,

birth_date date,

first_name text,

last_name text,

gender text,

hire_date date,

PRIMARY KEY (emp_no)

);

cqlsh:employees> CREATE TABLE salaries (

emp_no int,

salary int,

from_date date,

to_date date,

PRIMARY KEY (emp_no, from_date)

) WITH CLUSTERING ORDER BY (from_date DESC);

Now launch spark with Cassandra and Mysql driver support. First download spark in Downloads folder apache site.

$ cd ~/Downloads/

$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

$ cd spark-2.1.1-bin-hadoop2.7

$ bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2,mysql:mysql-connector-java:5.1.42 --conf spark.cassandra.connection.host=127.0.0.1

Create dataframe in spark that points in employees tables in mysql

scala> :paste

// Entering paste mode (ctrl-D to finish)

val employees = spark

.read

.format("jdbc")

.option("url", "jdbc:mysql://localhost/employees")

.option("driver", "com.mysql.jdbc.Driver")

.option("dbtable", "employees")

.option("user", "root")

.option("password", "training")

.load()

// Exiting paste mode, now interpreting.

employees: org.apache.spark.sql.DataFrame = [emp_no: int, birth_date: date ... 4 more fields]

View sample records from employees dataframe.

scala> employees.show()

+------+----------+----------+-----------+------+----------+

|emp_no|birth_date|first_name| last_name|gender| hire_date|

+------+----------+----------+-----------+------+----------+

| 10001|1953-09-02| Georgi| Facello| M|1986-06-26|

| 10002|1964-06-02| Bezalel| Simmel| F|1985-11-21|

| 10003|1959-12-03| Parto| Bamford| M|1986-08-28|

| 10004|1954-05-01| Chirstian| Koblick| M|1986-12-01|

| 10005|1955-01-21| Kyoichi| Maliniak| M|1989-09-12|

| 10006|1953-04-20| Anneke| Preusig| F|1989-06-02|

| 10007|1957-05-23| Tzvetan| Zielinski| F|1989-02-10|

| 10008|1958-02-19| Saniya| Kalloufi| M|1994-09-15|

| 10009|1952-04-19| Sumant| Peac| F|1985-02-18|

| 10010|1963-06-01| Duangkaew| Piveteau| F|1989-08-24|

| 10011|1953-11-07| Mary| Sluis| F|1990-01-22|

| 10012|1960-10-04| Patricio| Bridgland| M|1992-12-18|

| 10013|1963-06-07| Eberhardt| Terkki| M|1985-10-20|

| 10014|1956-02-12| Berni| Genin| M|1987-03-11|

| 10015|1959-08-19| Guoxiang| Nooteboom| M|1987-07-02|

| 10016|1961-05-02| Kazuhito|Cappelletti| M|1995-01-27|

| 10017|1958-07-06| Cristinel| Bouloucos| F|1993-08-03|

| 10018|1954-06-19| Kazuhide| Peha| F|1987-04-03|

| 10019|1953-01-23| Lillian| Haddadi| M|1999-04-30|

| 10020|1952-12-24| Mayuko| Warwick| M|1991-01-26|

+------+----------+----------+-----------+------+----------+

only showing top 20 rows

Same way, create a spark dataframe on salaries table from mysql

scala> :paste

// Entering paste mode (ctrl-D to finish)

val salaries = spark

.read

.format("jdbc")

.option("url", "jdbc:mysql://localhost/employees")

.option("driver", "com.mysql.jdbc.Driver")

.option("dbtable", "salaries")

.option("user", "root")

.option("password", "training")

.load()

salaries.show()

// Exiting paste mode, now interpreting.

+------+------+----------+----------+

|emp_no|salary| from_date| to_date|

+------+------+----------+----------+

| 10001| 60117|1986-06-26|1987-06-26|

| 10001| 62102|1987-06-26|1988-06-25|

| 10001| 66074|1988-06-25|1989-06-25|

| 10001| 66596|1989-06-25|1990-06-25|

| 10001| 66961|1990-06-25|1991-06-25|

| 10001| 71046|1991-06-25|1992-06-24|

| 10001| 74333|1992-06-24|1993-06-24|

| 10001| 75286|1993-06-24|1994-06-24|

| 10001| 75994|1994-06-24|1995-06-24|

| 10001| 76884|1995-06-24|1996-06-23|

| 10001| 80013|1996-06-23|1997-06-23|

| 10001| 81025|1997-06-23|1998-06-23|

| 10001| 81097|1998-06-23|1999-06-23|

| 10001| 84917|1999-06-23|2000-06-22|

| 10001| 85112|2000-06-22|2001-06-22|

| 10001| 85097|2001-06-22|2002-06-22|

| 10001| 88958|2002-06-22|9999-01-01|

| 10002| 65828|1996-08-03|1997-08-03|

| 10002| 65909|1997-08-03|1998-08-03|

| 10002| 67534|1998-08-03|1999-08-03|

+------+------+----------+----------+

only showing top 20 rows

salaries: org.apache.spark.sql.DataFrame = [emp_no: int, salary: int ... 2 more fields]

Since we have verifies we can read data from mysql in spark, we are ready to load the data from spark to Cassandra.

scala> :paste

// Entering paste mode (ctrl-D to finish)

employees

.write

.format("org.apache.spark.sql.cassandra")

.option("table", "employees")

.option("keyspace", "employees")

.save();

// Exiting paste mode, now interpreting.

Load the salaries tables to Cassandra

scala> :paste

// Entering paste mode (ctrl-D to finish)

salaries

.repartition(4)

.write

.format("org.apache.spark.sql.cassandra")

.option("table", "salaries")

.option("keyspace", "employees")

.mode("append")

.save();

// Exiting paste mode, now interpreting.

If you want to be selective about what columns or rows you want to migrate, following the steps below.

scala> salaries.registerAsTempTable("salaries")

scala> :paste

sql("select * from salaries where year(from_date) = 1986")

.repartition(4)

.write

.format("org.apache.spark.sql.cassandra")

.option("table", "salaries")

.option("keyspace", "employees")

.mode("append")

.save();

// Exiting paste mode, now interpreting.

As spark processing data, you can monitor the progress using Spark Web UI http://localhost:4040

Once the load is complete, switch to cqlsh screen and test the tables in Cassandra

cqlsh:employees> select * from employees limit 10;

emp_no | birth_date | first_name | gender | hire_date | last_name

--------+------------+------------+--------+------------+-------------

62693 | 1953-06-08 | Parviz | F | 1991-03-21 | Cappelletti

488360 | 1954-09-01 | Roselyn | M | 1985-03-30 | Fioravanti

492756 | 1953-05-03 | Shigeu | M | 1994-02-25 | Rajaraman

265902 | 1952-09-18 | Barna | M | 1991-11-23 | Pepe

263741 | 1964-08-06 | Huiqun | M | 1989-02-10 | Basagni

482725 | 1959-06-24 | Miomir | M | 1985-05-30 | Wielonsky

51678 | 1963-07-07 | Sanjiv | M | 1993-10-21 | Kowalchuk

77328 | 1962-06-01 | Valeri | M | 1985-02-04 | Rissland

296870 | 1954-02-27 | Sanjeeva | M | 1986-11-10 | Bahl

251574 | 1958-04-09 | Sham | M | 1989-09-21 | Delgrange

(10 rows)

cqlsh:employees> select * from salaries limit 10;

emp_no | from_date | salary | to_date

--------+------------+--------+------------

62693 | 2002-03-18 | 61780 | 9999-01-01

62693 | 2001-03-18 | 61891 | 2002-03-18

62693 | 2000-03-18 | 61977 | 2001-03-18

62693 | 1999-03-19 | 57808 | 2000-03-18

62693 | 1998-03-19 | 57610 | 1999-03-19

62693 | 1997-03-19 | 53333 | 1998-03-19

62693 | 1996-03-19 | 51603 | 1997-03-19

62693 | 1995-03-20 | 50146 | 1996-03-19

62693 | 1994-03-20 | 45842 | 1995-03-20

62693 | 1993-03-20 | 43779 | 1994-03-20

(10 rows)

Let's find out salary records in 2000 based on from_date of employee with emp_no 62693.

cqlsh:employees> select * from salaries where emp_no = 62693 and from_date > '2000-01-01' and from_date <= '2000-12-31';

emp_no | from_date | salary | to_date

--------+------------+--------+------------

62693 | 2000-03-18 | 61977 | 2001-03-18

Suppose, we wants to find the employees records by last_name. You apply an index on last_name field.

cqlsh:employees> create index on employees (last_name);

cqlsh:employees> select * from employees where last_name = 'Basagni' limit 5;

emp_no | birth_date | first_name | gender | hire_date | last_name

--------+------------+------------+--------+------------+-----------

263741 | 1964-08-06 | Huiqun | M | 1989-02-10 | Basagni

412507 | 1962-11-28 | Rasiah | M | 1989-06-23 | Basagni

254486 | 1963-08-02 | Arie | F | 1989-08-30 | Basagni

25485 | 1960-09-18 | Rasikan | M | 1992-04-01 | Basagni

240283 | 1960-04-09 | Yunming | M | 1994-06-09 | Basagni

Better solution, build a skinny table with last name as partition key for fast lookup. Using the result of this query, send another query based on the emp_no which is a primary column for employees table.

cqlsh:employees> CREATE MATERIALIZED VIEW employees_by_lastname AS

SELECT emp_no, last_name, first_name FROM employees

WHERE last_name IS NOT NULL

PRIMARY KEY (last_name, emp_no);

cqlsh:employees> select * from employees_by_lastname limit 10;

last_name | emp_no | first_name

-----------+--------+------------

Schahn | 14229 | Remko

Schahn | 14633 | Uli

Schahn | 14996 | Jianhao

Schahn | 15140 | Yonghong