Query Cassandra Tables using Spark
Objective:
Query tables in Cassandra tables using Spark.
Description:
Cassandra is a distributed databases in which you can define tables with schema. Cassandra supports simple SELECT queries but does not support join queries. It uses a notion called pre-computed join for fast response time but causes data duplication across tables. Some simple join capability is useful to avoid such data duplication. Spark can be useful to supplement Cassandra's capability to serve join queries. You can connect Spark to Cassandra, defines Spark tables against Cassandra tables and write join queries. Spark can expose the query capability using it JDBC channel as well. Each query reaches out to Cassandra for the latest data. You can cache the data in Spark to serve data from memory without interacting with Cassandra, but they cause some stale data.
Software:
1. Oracle Java 1.8
2. Apache Cassandra v3.9
3. Apache Spark v2.1
Dataset: Movies dataset from grouplense project. http://grouplens.org/datasets/movielens/latest/
- will use movies and ratings datasets to find averge rating for each movie
Check the java version. Required is 1.8.
$ java -version
Check local ip address. Set rpc_address in cassandra.yaml that Cassandra will listen to for RPC communication.
Start cassandra process in the foreground.
$ bin/cassandra -f
Launch Cassandra query shell.
$ bin/cqlsh <ip address>
Create a keyspace
cqlsh:lab> create keyspace lab with replication = {'class': 'SimpleStrategy', 'replication_factor': 1} ;
Create table for movies data.
cqlsh:lab> create table lab.movies (movie_id int primary key, title text, genres text);
Load movies data into movies table
cqlsh:lab> copy movies(movie_id, title, genres) from '~/Downloads/apps/ml-latest-small/movies.csv' with header = true;
Similarly,, create a table for ratings data and load data into the table.
cqlsh:lab> create table lab.ratings (user_id int, movie_id int, rating double, timestamp bigint, primary key((user_id), movie_id));
cqlsh:lab> copy ratings(user_id, movie_id, rating, timestamp) from '~/Downloads/apps/ml-latest-small/ratings.csv' with header = true;
Find the latest cassandra connector for spark in the maven repo. Launch spark shell with Cassandra connector and Cassandra host configuration.
$ bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 --conf spark.cassandra.connection.host=192.168.1.3
Create a dataframe for movies table in Cassandra
scala> val movies = spark.
read.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "movies", "keyspace" -> "lab")).
load()
scala> movies.show()
Create a dataframe for ratings table in Cassandra
scala> val ratings = spark.
read.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "ratings", "keyspace" -> "lab")).
load()
scala> ratings.show()
Register the movies and ratings dataframes as temporary table.
scala> movies.registerTempTable("movies")
scala> ratings.registerTempTable("ratings")
Because movies and ratings are already defined as temporary tables, you can write operation on those tables.
scala> sql("select t1.movie_id, t1.title, avg(t2.rating) rating_avg from movies t1 left join ratings t2 on t1.movie_id = t2.movie_id group by t1.movie_id, t1.title").show()