Migrate data from RDBMS

Goal: migrate data from mysql to cassandra.

Steps:

1. Load data into mysql

2. Create schema for the tables in Cassandra

3. Launch spark with Cassandra and mysql support

4. Using code, pull data from mysql tables and push to cassandra tables.

A test database has been loaded to mysql as employees table as a database - employees.

Login to mysql to test the data.

[training@localhost test_db]$ mysql -u root -p 
Enter password: 
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 16
Server version: 10.0.31-MariaDB MariaDB Server
Copyright (c) 2000, 2017, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]> 
MariaDB [(none)]> 
MariaDB [(none)]> 
MariaDB [(none)]> show databases;
+--------------------+
| Database           |
+--------------------+
| employees          |
| information_schema |
| mysql              |
| performance_schema |
| test               |
+--------------------+
5 rows in set (0.00 sec)
MariaDB [(none)]> use employees;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
MariaDB [employees]> show tables;
+----------------------+
| Tables_in_employees  |
+----------------------+
| current_dept_emp     |
| departments          |
| dept_emp             |
| dept_emp_latest_date |
| dept_manager         |
| employees            |
| salaries             |
| titles               |
+----------------------+
8 rows in set (0.00 sec)
MariaDB [employees]> select count(*) from employees;
+----------+
| count(*) |
+----------+
|   300024 |
+----------+
1 row in set (0.09 sec)
MariaDB [employees]> select count(*) from salaries;
+----------+
| count(*) |
+----------+
|  2844047 |
+----------+
1 row in set (0.47 sec)

View schema of employees table. This will help create a schema of a table in Cassandra. The output is slightly modified.

MariaDB [employees]> show create table employees;
CREATE TABLE `employees` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
)

Create a new keyspace employees

cqlsh> create keyspace employees WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

cqlsh> use employees ;

Create a matching table in Cassandra for each in test_db in mysql

cqlsh:employees> CREATE TABLE employees (
  emp_no int,
  birth_date date,
  first_name text,
  last_name text,
  gender text,
  hire_date date,
  PRIMARY KEY (emp_no)
);
cqlsh:employees> CREATE TABLE salaries (
  emp_no int,
  salary int,
  from_date date,
  to_date date,
  PRIMARY KEY (emp_no, from_date)
) WITH CLUSTERING ORDER BY (from_date DESC);

Now launch spark with Cassandra and Mysql driver support. First download spark in Downloads folder apache site.

$ cd ~/Downloads/
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz 
$ cd spark-2.1.1-bin-hadoop2.7
$ bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2,mysql:mysql-connector-java:5.1.42 --conf spark.cassandra.connection.host=127.0.0.1

Create dataframe in spark that points in employees tables in mysql

scala> :paste
// Entering paste mode (ctrl-D to finish)
val employees = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost/employees")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "employees")
.option("user", "root")
.option("password", "training")
.load()
// Exiting paste mode, now interpreting.
employees: org.apache.spark.sql.DataFrame = [emp_no: int, birth_date: date ... 4 more fields]

View sample records from employees dataframe.

scala> employees.show()
+------+----------+----------+-----------+------+----------+
|emp_no|birth_date|first_name|  last_name|gender| hire_date|
+------+----------+----------+-----------+------+----------+
| 10001|1953-09-02|    Georgi|    Facello|     M|1986-06-26|
| 10002|1964-06-02|   Bezalel|     Simmel|     F|1985-11-21|
| 10003|1959-12-03|     Parto|    Bamford|     M|1986-08-28|
| 10004|1954-05-01| Chirstian|    Koblick|     M|1986-12-01|
| 10005|1955-01-21|   Kyoichi|   Maliniak|     M|1989-09-12|
| 10006|1953-04-20|    Anneke|    Preusig|     F|1989-06-02|
| 10007|1957-05-23|   Tzvetan|  Zielinski|     F|1989-02-10|
| 10008|1958-02-19|    Saniya|   Kalloufi|     M|1994-09-15|
| 10009|1952-04-19|    Sumant|       Peac|     F|1985-02-18|
| 10010|1963-06-01| Duangkaew|   Piveteau|     F|1989-08-24|
| 10011|1953-11-07|      Mary|      Sluis|     F|1990-01-22|
| 10012|1960-10-04|  Patricio|  Bridgland|     M|1992-12-18|
| 10013|1963-06-07| Eberhardt|     Terkki|     M|1985-10-20|
| 10014|1956-02-12|     Berni|      Genin|     M|1987-03-11|
| 10015|1959-08-19|  Guoxiang|  Nooteboom|     M|1987-07-02|
| 10016|1961-05-02|  Kazuhito|Cappelletti|     M|1995-01-27|
| 10017|1958-07-06| Cristinel|  Bouloucos|     F|1993-08-03|
| 10018|1954-06-19|  Kazuhide|       Peha|     F|1987-04-03|
| 10019|1953-01-23|   Lillian|    Haddadi|     M|1999-04-30|
| 10020|1952-12-24|    Mayuko|    Warwick|     M|1991-01-26|
+------+----------+----------+-----------+------+----------+
only showing top 20 rows

Same way, create a spark dataframe on salaries table from mysql

scala> :paste
// Entering paste mode (ctrl-D to finish)
val salaries = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost/employees")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "salaries")
.option("user", "root")
.option("password", "training")
.load()
salaries.show()
// Exiting paste mode, now interpreting.
+------+------+----------+----------+
|emp_no|salary| from_date|   to_date|
+------+------+----------+----------+
| 10001| 60117|1986-06-26|1987-06-26|
| 10001| 62102|1987-06-26|1988-06-25|
| 10001| 66074|1988-06-25|1989-06-25|
| 10001| 66596|1989-06-25|1990-06-25|
| 10001| 66961|1990-06-25|1991-06-25|
| 10001| 71046|1991-06-25|1992-06-24|
| 10001| 74333|1992-06-24|1993-06-24|
| 10001| 75286|1993-06-24|1994-06-24|
| 10001| 75994|1994-06-24|1995-06-24|
| 10001| 76884|1995-06-24|1996-06-23|
| 10001| 80013|1996-06-23|1997-06-23|
| 10001| 81025|1997-06-23|1998-06-23|
| 10001| 81097|1998-06-23|1999-06-23|
| 10001| 84917|1999-06-23|2000-06-22|
| 10001| 85112|2000-06-22|2001-06-22|
| 10001| 85097|2001-06-22|2002-06-22|
| 10001| 88958|2002-06-22|9999-01-01|
| 10002| 65828|1996-08-03|1997-08-03|
| 10002| 65909|1997-08-03|1998-08-03|
| 10002| 67534|1998-08-03|1999-08-03|
+------+------+----------+----------+
only showing top 20 rows
salaries: org.apache.spark.sql.DataFrame = [emp_no: int, salary: int ... 2 more fields]

Since we have verifies we can read data from mysql in spark, we are ready to load the data from spark to Cassandra.

scala> :paste
// Entering paste mode (ctrl-D to finish)
employees
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "employees")
.option("keyspace", "employees")
.save();
// Exiting paste mode, now interpreting.
Load the salaries tables to Cassandra
scala> :paste
// Entering paste mode (ctrl-D to finish)
salaries
.repartition(4)
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "salaries")
.option("keyspace", "employees")
.mode("append")
.save();
// Exiting paste mode, now interpreting.

If you want to be selective about what columns or rows you want to migrate, following the steps below.

scala> salaries.registerAsTempTable("salaries")
scala> :paste
sql("select * from salaries where year(from_date) = 1986")
.repartition(4)
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "salaries")
.option("keyspace", "employees")
.mode("append")
.save();

// Exiting paste mode, now interpreting.

As spark processing data, you can monitor the progress using Spark Web UI http://localhost:4040

Once the load is complete, switch to cqlsh screen and test the tables in Cassandra

cqlsh:employees> select * from employees limit 10;
 emp_no | birth_date | first_name | gender | hire_date  | last_name
--------+------------+------------+--------+------------+-------------
  62693 | 1953-06-08 |     Parviz |      F | 1991-03-21 | Cappelletti
 488360 | 1954-09-01 |    Roselyn |      M | 1985-03-30 |  Fioravanti
 492756 | 1953-05-03 |     Shigeu |      M | 1994-02-25 |   Rajaraman
 265902 | 1952-09-18 |      Barna |      M | 1991-11-23 |        Pepe
 263741 | 1964-08-06 |     Huiqun |      M | 1989-02-10 |     Basagni
 482725 | 1959-06-24 |     Miomir |      M | 1985-05-30 |   Wielonsky
  51678 | 1963-07-07 |     Sanjiv |      M | 1993-10-21 |   Kowalchuk
  77328 | 1962-06-01 |     Valeri |      M | 1985-02-04 |    Rissland
 296870 | 1954-02-27 |   Sanjeeva |      M | 1986-11-10 |        Bahl
 251574 | 1958-04-09 |       Sham |      M | 1989-09-21 |   Delgrange
(10 rows)
cqlsh:employees> select * from salaries  limit 10;
 emp_no | from_date  | salary | to_date
--------+------------+--------+------------
  62693 | 2002-03-18 |  61780 | 9999-01-01
  62693 | 2001-03-18 |  61891 | 2002-03-18
  62693 | 2000-03-18 |  61977 | 2001-03-18
  62693 | 1999-03-19 |  57808 | 2000-03-18
  62693 | 1998-03-19 |  57610 | 1999-03-19
  62693 | 1997-03-19 |  53333 | 1998-03-19
  62693 | 1996-03-19 |  51603 | 1997-03-19
  62693 | 1995-03-20 |  50146 | 1996-03-19
  62693 | 1994-03-20 |  45842 | 1995-03-20
  62693 | 1993-03-20 |  43779 | 1994-03-20
(10 rows)

Let's find out salary records in 2000 based on from_date of employee with emp_no 62693.

cqlsh:employees> select * from salaries where emp_no = 62693 and from_date > '2000-01-01' and from_date <= '2000-12-31';
 emp_no | from_date  | salary | to_date
--------+------------+--------+------------
  62693 | 2000-03-18 |  61977 | 2001-03-18

Suppose, we wants to find the employees records by last_name. You apply an index on last_name field.

cqlsh:employees> create index on employees (last_name);
cqlsh:employees> select * from employees where last_name = 'Basagni' limit 5;
 emp_no | birth_date | first_name | gender | hire_date  | last_name
--------+------------+------------+--------+------------+-----------
 263741 | 1964-08-06 |     Huiqun |      M | 1989-02-10 |   Basagni
 412507 | 1962-11-28 |     Rasiah |      M | 1989-06-23 |   Basagni
 254486 | 1963-08-02 |       Arie |      F | 1989-08-30 |   Basagni
  25485 | 1960-09-18 |    Rasikan |      M | 1992-04-01 |   Basagni
 240283 | 1960-04-09 |    Yunming |      M | 1994-06-09 |   Basagni

Better solution, build a skinny table with last name as partition key for fast lookup. Using the result of this query, send another query based on the emp_no which is a primary column for employees table.

cqlsh:employees> CREATE MATERIALIZED VIEW employees_by_lastname AS
       SELECT emp_no, last_name, first_name FROM employees
       WHERE last_name IS NOT NULL
       PRIMARY KEY (last_name, emp_no);
      
cqlsh:employees> select * from employees_by_lastname limit 10;
 last_name | emp_no | first_name
-----------+--------+------------
    Schahn |  14229 |      Remko
    Schahn |  14633 |        Uli
    Schahn |  14996 |    Jianhao
    Schahn |  15140 |   Yonghong