Working with MySQL from Spark SQL

Step 1. Add a user "spark" to mysql. If you already have a user that can login from any client, skip this step. You may have to run these on the machine that has mysql.

$ mysql -u root -p 
<enter your root password>
mysql> SELECT user, host FROM mysql.user;
mysql> CREATE USER 'spark'@'%' IDENTIFIED BY 'spark';
mysql> REVOKE ALL PRIVILEGES, GRANT OPTION FROM 'spark'@'%';
mysql> GRANT DROP,CREATE,ALTER,SELECT,INSERT,UPDATE,DELETE,LOCK TABLES,EXECUTE ON *.* TO 'spark'@'%';
mysql> FLUSH PRIVILEGES;

Step 2. Download mysql jdbc connector (for example, mysql-connector-java-5.1.38-bin)

You can use test the connection to mysql from a remote machine using tools like SQL Workbench.

Step 3. Launch spark-shell with mysql driver.

$ cd $SPARK_HOME
$ bin/spark-shell --master local[2] --verbose --jars /Volumes/SONY/Software/Jars/mysql-connector-java-5.1.38-bin.jar

Step 4. Now, you are ready connect to mysql from Spark SQL. Note: that while running this on a cluster, replace the server name in url with actual server name or IP address so tasks running on different nodes on the cluster can reach to mysql server over the network.

scala> val customers = sqlContext.read.format("jdbc")

.option("url", "jdbc:mysql://localhost/retail_db")

.option("driver", "com.mysql.jdbc.Driver")

.option("dbtable", "customers")

.option("user", "root")

.option("password", "root")

.load()

You can write a query to load as well.

scala> val customers = sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/retail_db")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "(select * from customers) as c2")
.option("user", "root")
.option("password", "root")
.load()

Step 5. Explore the schema

scala> customers.printSchema
root
 |-- customer_id: integer (nullable = false)
 |-- customer_fname: string (nullable = false)
 |-- customer_lname: string (nullable = false)
 |-- customer_email: string (nullable = false)
 |-- customer_password: string (nullable = false)
 |-- customer_street: string (nullable = false)
 |-- customer_city: string (nullable = false)
 |-- customer_state: string (nullable = false)
 |-- customer_zipcode: string (nullable = false)

Step 6. Select some sample columns and rows.

scala> customers.select("customer_fname", "customer_lname", "customer_city").show(5)
+--------------+--------------+-------------+
|customer_fname|customer_lname|customer_city|
+--------------+--------------+-------------+
|       Richard|     Hernandez|  Brownsville|
|          Mary|       Barrett|    Littleton|
|           Ann|         Smith|       Caguas|
|          Mary|         Jones|   San Marcos|
|        Robert|        Hudson|       Caguas|
+--------------+--------------+-------------+
only showing top 5 rows

Step 6. Register customers dataframe as temporary table and query on that temp table.

scala> customers.registerTempTable("customers")
scala> sql("SELECT customer_id, customer_fname, customer_lname FROM customers").show(5)
+-----------+--------------+--------------+
|customer_id|customer_fname|customer_lname|
+-----------+--------------+--------------+
|          1|       Richard|     Hernandez|
|          2|          Mary|       Barrett|
|          3|           Ann|         Smith|
|          4|          Mary|         Jones|
|          5|        Robert|        Hudson|
+-----------+--------------+--------------+

only showing top 5 rows

Data Change in MySQL

Now let's see some impact on data change in mysql. Make a change to customer_lname for a customer in mysql and rerun show function or the sql statement again.

Make a change to the customer_lname field for customer 1.

mysql> update customers set customer_lname = 'Hernandez1' where customer_id = 1;
Query OK, 1 row affected (0.08 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Verify the data in mysql. Value in red was updated.

mysql> SELECT customer_id, customer_fname, customer_lname FROM customers LIMIT 5;
+-------------+----------------+----------------+
| customer_id | customer_fname | customer_lname |
+-------------+----------------+----------------+
|           1 | Richard        | Hernandez1     |
|           2 | Mary           | Barrett        |
|           3 | Ann            | Smith          |
|           4 | Mary           | Jones          |
|           5 | Robert         | Hudson         |
+-------------+----------------+----------------+
5 rows in set (0.02 sec)

Now, run query in Spark SQL. Value in red color is shown in Spark SQL

scala> sql("SELECT customer_id, customer_fname, customer_lname FROM customers").show(5)
+-----------+--------------+--------------+
|customer_id|customer_fname|customer_lname|
+-----------+--------------+--------------+
|          1|       Richard|    Hernandez1|
|          2|          Mary|       Barrett|
|          3|           Ann|         Smith|
|          4|          Mary|         Jones|
|          5|        Robert|        Hudson|
+-----------+--------------+--------------+
only showing top 5 rows

Now let's see the impact of schema change. To experiment, add a column in the mysql table.

mysql> alter table customers add column(temp_col varchar(1));
mysql> describe customers;
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
...
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
| temp_col          | varchar(1)   | YES  |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+

Now, verify whether the new column shows up in the Spark SQL. It will not show up.

scala> sql("describe customers").show
+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|      customer_id|      int|       |
|   customer_fname|   string|       |
|   customer_lname|   string|       |
|   customer_email|   string|       |
|customer_password|   string|       |
|  customer_street|   string|       |
|    customer_city|   string|       |
|   customer_state|   string|       |
| customer_zipcode|   string|       |
+-----------------+---------+-------+

Conclusion:

  • Changes in the mysql are reflected in the Spark SQL. Spark SQL has not cached data. Note that if the data were cached, you need to uncache and reload the table to reflect the changes in mysql.
  • Changes to the schema are not reflected to the Spark SQL.

Exporting Dataframe to file

Now, we want to export to the data in csv file. CSV support is not available by default. To have to add the support during spark-shell launch. To enable CSV file format re-launch spark shell with csv package.

$ bin/spark-shell --master local[2] --verbose --jars /Volumes/SONY/Software/Jars/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.4.0 

Then rerun the statements to create the "customers" data frame.

Now, save the dataframe as csv.

scala> customers.write.format("csv").mode("overwrite").option("header","true").save("/Users/spark/Downloads/tmp")

Verify the output file in the location you have specified. Note, the number of "part" files will be equal to the number of partitions of the data frame. To check or change use the following statements.

scala> customers.rdd.partitions.size
scala> customers.repartition(3)

If you want to save a subset of customers, you use the following technique.

scala> sql("SELECT customer_id, customer_fname, customer_lname FROM customers WHERE customer_state = 'NY'").mode("overwrite").option("header","true").save("/Users/spark/Downloads/tmp")

Save Dataframe to MySQL

Create a dataframe from a json file

scala> val employees = sqlContext.read.format("json").load("/Volumes/SONY/Data/people.json")
employees: org.apache.spark.sql.DataFrame = [dob: string, firstname: string, gender: string, lastname: string, middlename: string, salary: bigint, ssn: string]
scala> employees.printSchema
root
 |-- dob: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- ssn: string (nullable = true)
scala> employees.show(5)
+----------+---------+------+---------+----------+------+-----------+
|       dob|firstname|gender| lastname|middlename|salary|        ssn|
+----------+---------+------+---------+----------+------+-----------+
|1940-08-06|    Dacia|     F|Samborski|   Rosella|274357|932-39-7400|
|1964-01-31|    Loria|     F|  Cassino|     Suzie|166618|940-40-2137|
|1936-06-02|Lashaunda|     F| Rockhill|   Markita|185766|923-83-5563|
|1971-09-25|  Candace|     F|    Goike|     Marcy| 92497|935-40-2967|
|1926-06-29|   Marhta|     F|    Bonin|  Filomena| 40013|968-22-1158|
+----------+---------+------+---------+----------+------+-----------+

We want to save the values to mysql. Consider that there is no corresponding table in mysql.

import java.util.Properties
val properties = new Properties();
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("user", "spark");
properties.setProperty("password", "masterkey");
employees.write.jdbc("jdbc:mysql://localhost/retail_db", "employees", properties)

You can specify "mode" (overwrite, append, error (default)) to control how the dataframe records are handled in the mysql table. It is not possible to update records of an existing table using this api as of Spark release version 1.6 or 2.0. You can create a temporary table in mysql and use an upsert operation manually to achieve the "update" functionality. Alternative, you can hand write code to "update" record using rdd operation. Below is a code snippet.

import java.sql.{Connection,DriverManager}



employees.rdd.foreachPartition{emp_iter =>

val url = "jdbc:mysql://localhost:3306/mysql"

val driver = "com.mysql.jdbc.Driver"

val username = "spark"

val password = "masterkey"

var connection: Connection = _

try {

Class.forName(driver)

connection = DriverManager.getConnection(url, username, password)

val statement = connection.createStatement

statement.executeUpdate("UPDATE employees ...");

} finally {

connection.close

}

}