Migrate data from RDBMS
Goal: migrate data from mysql to cassandra.
Steps:
1. Load data into mysql
2. Create schema for the tables in Cassandra
3. Launch spark with Cassandra and mysql support
4. Using code, pull data from mysql tables and push to cassandra tables.
A test database has been loaded to mysql as employees table as a database - employees.
Login to mysql to test the data.
[training@localhost test_db]$ mysql -u root -p
Enter password:
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MariaDB connection id is 16
Server version: 10.0.31-MariaDB MariaDB Server
Copyright (c) 2000, 2017, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]>
MariaDB [(none)]>
MariaDB [(none)]>
MariaDB [(none)]> show databases;
+--------------------+
| Database |
+--------------------+
| employees |
| information_schema |
| mysql |
| performance_schema |
| test |
+--------------------+
5 rows in set (0.00 sec)
MariaDB [(none)]> use employees;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
MariaDB [employees]> show tables;
+----------------------+
| Tables_in_employees |
+----------------------+
| current_dept_emp |
| departments |
| dept_emp |
| dept_emp_latest_date |
| dept_manager |
| employees |
| salaries |
| titles |
+----------------------+
8 rows in set (0.00 sec)
MariaDB [employees]> select count(*) from employees;
+----------+
| count(*) |
+----------+
| 300024 |
+----------+
1 row in set (0.09 sec)
MariaDB [employees]> select count(*) from salaries;
+----------+
| count(*) |
+----------+
| 2844047 |
+----------+
1 row in set (0.47 sec)
View schema of employees table. This will help create a schema of a table in Cassandra. The output is slightly modified.
MariaDB [employees]> show create table employees;
CREATE TABLE `employees` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
)
Create a new keyspace employees
cqlsh> create keyspace employees WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> use employees ;
Create a matching table in Cassandra for each in test_db in mysql
cqlsh:employees> CREATE TABLE employees (
emp_no int,
birth_date date,
first_name text,
last_name text,
gender text,
hire_date date,
PRIMARY KEY (emp_no)
);
cqlsh:employees> CREATE TABLE salaries (
emp_no int,
salary int,
from_date date,
to_date date,
PRIMARY KEY (emp_no, from_date)
) WITH CLUSTERING ORDER BY (from_date DESC);
Now launch spark with Cassandra and Mysql driver support. First download spark in Downloads folder apache site.
$ cd ~/Downloads/
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz
$ cd spark-2.1.1-bin-hadoop2.7
$ bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2,mysql:mysql-connector-java:5.1.42 --conf spark.cassandra.connection.host=127.0.0.1
Create dataframe in spark that points in employees tables in mysql
scala> :paste
// Entering paste mode (ctrl-D to finish)
val employees = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost/employees")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "employees")
.option("user", "root")
.option("password", "training")
.load()
// Exiting paste mode, now interpreting.
employees: org.apache.spark.sql.DataFrame = [emp_no: int, birth_date: date ... 4 more fields]
View sample records from employees dataframe.
scala> employees.show()
+------+----------+----------+-----------+------+----------+
|emp_no|birth_date|first_name| last_name|gender| hire_date|
+------+----------+----------+-----------+------+----------+
| 10001|1953-09-02| Georgi| Facello| M|1986-06-26|
| 10002|1964-06-02| Bezalel| Simmel| F|1985-11-21|
| 10003|1959-12-03| Parto| Bamford| M|1986-08-28|
| 10004|1954-05-01| Chirstian| Koblick| M|1986-12-01|
| 10005|1955-01-21| Kyoichi| Maliniak| M|1989-09-12|
| 10006|1953-04-20| Anneke| Preusig| F|1989-06-02|
| 10007|1957-05-23| Tzvetan| Zielinski| F|1989-02-10|
| 10008|1958-02-19| Saniya| Kalloufi| M|1994-09-15|
| 10009|1952-04-19| Sumant| Peac| F|1985-02-18|
| 10010|1963-06-01| Duangkaew| Piveteau| F|1989-08-24|
| 10011|1953-11-07| Mary| Sluis| F|1990-01-22|
| 10012|1960-10-04| Patricio| Bridgland| M|1992-12-18|
| 10013|1963-06-07| Eberhardt| Terkki| M|1985-10-20|
| 10014|1956-02-12| Berni| Genin| M|1987-03-11|
| 10015|1959-08-19| Guoxiang| Nooteboom| M|1987-07-02|
| 10016|1961-05-02| Kazuhito|Cappelletti| M|1995-01-27|
| 10017|1958-07-06| Cristinel| Bouloucos| F|1993-08-03|
| 10018|1954-06-19| Kazuhide| Peha| F|1987-04-03|
| 10019|1953-01-23| Lillian| Haddadi| M|1999-04-30|
| 10020|1952-12-24| Mayuko| Warwick| M|1991-01-26|
+------+----------+----------+-----------+------+----------+
only showing top 20 rows
Same way, create a spark dataframe on salaries table from mysql
scala> :paste
// Entering paste mode (ctrl-D to finish)
val salaries = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost/employees")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "salaries")
.option("user", "root")
.option("password", "training")
.load()
salaries.show()
// Exiting paste mode, now interpreting.
+------+------+----------+----------+
|emp_no|salary| from_date| to_date|
+------+------+----------+----------+
| 10001| 60117|1986-06-26|1987-06-26|
| 10001| 62102|1987-06-26|1988-06-25|
| 10001| 66074|1988-06-25|1989-06-25|
| 10001| 66596|1989-06-25|1990-06-25|
| 10001| 66961|1990-06-25|1991-06-25|
| 10001| 71046|1991-06-25|1992-06-24|
| 10001| 74333|1992-06-24|1993-06-24|
| 10001| 75286|1993-06-24|1994-06-24|
| 10001| 75994|1994-06-24|1995-06-24|
| 10001| 76884|1995-06-24|1996-06-23|
| 10001| 80013|1996-06-23|1997-06-23|
| 10001| 81025|1997-06-23|1998-06-23|
| 10001| 81097|1998-06-23|1999-06-23|
| 10001| 84917|1999-06-23|2000-06-22|
| 10001| 85112|2000-06-22|2001-06-22|
| 10001| 85097|2001-06-22|2002-06-22|
| 10001| 88958|2002-06-22|9999-01-01|
| 10002| 65828|1996-08-03|1997-08-03|
| 10002| 65909|1997-08-03|1998-08-03|
| 10002| 67534|1998-08-03|1999-08-03|
+------+------+----------+----------+
only showing top 20 rows
salaries: org.apache.spark.sql.DataFrame = [emp_no: int, salary: int ... 2 more fields]
Since we have verifies we can read data from mysql in spark, we are ready to load the data from spark to Cassandra.
scala> :paste
// Entering paste mode (ctrl-D to finish)
employees
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "employees")
.option("keyspace", "employees")
.save();
// Exiting paste mode, now interpreting.
Load the salaries tables to Cassandra
scala> :paste
// Entering paste mode (ctrl-D to finish)
salaries
.repartition(4)
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "salaries")
.option("keyspace", "employees")
.mode("append")
.save();
// Exiting paste mode, now interpreting.
If you want to be selective about what columns or rows you want to migrate, following the steps below.
scala> salaries.registerAsTempTable("salaries")
scala> :paste
sql("select * from salaries where year(from_date) = 1986")
.repartition(4)
.write
.format("org.apache.spark.sql.cassandra")
.option("table", "salaries")
.option("keyspace", "employees")
.mode("append")
.save();
// Exiting paste mode, now interpreting.
As spark processing data, you can monitor the progress using Spark Web UI http://localhost:4040
Once the load is complete, switch to cqlsh screen and test the tables in Cassandra
cqlsh:employees> select * from employees limit 10;
emp_no | birth_date | first_name | gender | hire_date | last_name
--------+------------+------------+--------+------------+-------------
62693 | 1953-06-08 | Parviz | F | 1991-03-21 | Cappelletti
488360 | 1954-09-01 | Roselyn | M | 1985-03-30 | Fioravanti
492756 | 1953-05-03 | Shigeu | M | 1994-02-25 | Rajaraman
265902 | 1952-09-18 | Barna | M | 1991-11-23 | Pepe
263741 | 1964-08-06 | Huiqun | M | 1989-02-10 | Basagni
482725 | 1959-06-24 | Miomir | M | 1985-05-30 | Wielonsky
51678 | 1963-07-07 | Sanjiv | M | 1993-10-21 | Kowalchuk
77328 | 1962-06-01 | Valeri | M | 1985-02-04 | Rissland
296870 | 1954-02-27 | Sanjeeva | M | 1986-11-10 | Bahl
251574 | 1958-04-09 | Sham | M | 1989-09-21 | Delgrange
(10 rows)
cqlsh:employees> select * from salaries limit 10;
emp_no | from_date | salary | to_date
--------+------------+--------+------------
62693 | 2002-03-18 | 61780 | 9999-01-01
62693 | 2001-03-18 | 61891 | 2002-03-18
62693 | 2000-03-18 | 61977 | 2001-03-18
62693 | 1999-03-19 | 57808 | 2000-03-18
62693 | 1998-03-19 | 57610 | 1999-03-19
62693 | 1997-03-19 | 53333 | 1998-03-19
62693 | 1996-03-19 | 51603 | 1997-03-19
62693 | 1995-03-20 | 50146 | 1996-03-19
62693 | 1994-03-20 | 45842 | 1995-03-20
62693 | 1993-03-20 | 43779 | 1994-03-20
(10 rows)
Let's find out salary records in 2000 based on from_date of employee with emp_no 62693.
cqlsh:employees> select * from salaries where emp_no = 62693 and from_date > '2000-01-01' and from_date <= '2000-12-31';
emp_no | from_date | salary | to_date
--------+------------+--------+------------
62693 | 2000-03-18 | 61977 | 2001-03-18
Suppose, we wants to find the employees records by last_name. You apply an index on last_name field.
cqlsh:employees> create index on employees (last_name);
cqlsh:employees> select * from employees where last_name = 'Basagni' limit 5;
emp_no | birth_date | first_name | gender | hire_date | last_name
--------+------------+------------+--------+------------+-----------
263741 | 1964-08-06 | Huiqun | M | 1989-02-10 | Basagni
412507 | 1962-11-28 | Rasiah | M | 1989-06-23 | Basagni
254486 | 1963-08-02 | Arie | F | 1989-08-30 | Basagni
25485 | 1960-09-18 | Rasikan | M | 1992-04-01 | Basagni
240283 | 1960-04-09 | Yunming | M | 1994-06-09 | Basagni
Better solution, build a skinny table with last name as partition key for fast lookup. Using the result of this query, send another query based on the emp_no which is a primary column for employees table.
cqlsh:employees> CREATE MATERIALIZED VIEW employees_by_lastname AS
SELECT emp_no, last_name, first_name FROM employees
WHERE last_name IS NOT NULL
PRIMARY KEY (last_name, emp_no);
cqlsh:employees> select * from employees_by_lastname limit 10;
last_name | emp_no | first_name
-----------+--------+------------
Schahn | 14229 | Remko
Schahn | 14633 | Uli
Schahn | 14996 | Jianhao
Schahn | 15140 | Yonghong