CQL (Cassandra Query Language)

Objectives: learn the following

  1. View system keys spaces
  2. Create a keyspace with simple replication strategy
  3. Create a table with simple primary key and composite primary key
  4. Load csv data into the cassandra table using COPY command in cqlsh
  5. How to create secondary index on Cassandra table
  6. Need for materialized views
  7. Overwrite "writetime" of a field value of a table
  8. Use expiary date to a row of a table so that it is deleted after TTL period
  9. Lightweight transactions
  10. Create custom data type
  11. Use UUID and TIMEUUID
  12. Delete records from cassandra column

Download Cassandra from here and test dataset from here.

$ cd ~/Downloads/
$ tar xf apache-cassandra-3.11.4-bin.tar.gz 
$ cd apache-cassandra-3.11.4
$ export CASSANDRA_HOME=$(pwd)

Start cassandra in foreground and keep in running. If you terminate, cassandra db will terminate as well.

$ bin/cassandra -f

Start cqlsh prompt in another terminal for writing queries.

$ cd $CASSANDRA_HOME
$ bin/cqlsh

View existing keyspaces. The output may be different on your system.

cqlsh> select * from system_schema.keyspaces ;
 keyspace_name      | durable_writes | replication
--------------------+----------------+-------------------------------------------------------------------------------------
        system_auth |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
      system_schema |           True |                             {'class': 'org.apache.cassandra.locator.LocalStrategy'}
 system_distributed |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '3'}
             system |           True |                             {'class': 'org.apache.cassandra.locator.LocalStrategy'}
      system_traces |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '2'}
(6 rows)


Create a demo keyspace called demo with replication factor = 1 and replication strategy = Simple

cqlsh> create KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};


Open another terminal view datasets. We will create tables for each dataset and load data into these tables.

$ cd ~/Downloads/datasets/ml-latest-small

Sample from the dataset

$ head movies.csv 
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
...

$ head ratings.csv

userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179
1,1061,3.0,1260759182
1,1129,2.0,1260759185
...



Create table under demo keyspace

cqlsh> use demo;
cqlsh:demo> create table movies (movieId int primary key, title text, genres text);


View existing tables

cqlsh:demo> DESC tables;

Describe movies table

cqlsh:demo> DESC table movies ;

Load movies.csv into movies table

cqlsh:demo> COPY movies from '~/Downloads/datasets/ml-latest-small/movies.csv' WITH HEADER = true;
cqlsh:demo> select * from movies limit 10;

There was an intentional mistake by not mentioning field in the file with those in the table in copy command. As a best practice always remember to specify the column in the order they appear in the csv load file. If you do not specify the column names in the copy command, column names are select in alphabetically sorted order.

Let's truncate the data and reload data.

cqlsh:demo> TRUNCATE movies ;
cqlsh:demo> COPY movies (movieId, title, genres) from '~/Downloads/datasets/ml-latest-small/movies.csv' WITH HEADER = true;


Let's try to filter based on title, note that title is a non primary key. The query is supposed to fail. Two run this query we have 3 options - either specify "allow filtering" clause or create a secondary index on the tile or create a materialised view. Do not use "allow filtering" clause in production usage since it can affect the cluster performance. Latter 2 options are recommended.

cqlsh:demo> select * from movies where title = 'City Hall (1996)';
You get 
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

Option 1: Use "allow filtering". Essentially the query works as expected but should not be used in general.

cqlsh:demo> select * from movies where title = 'City Hall (1996)' ALLOW FILTERING;


Option b: Create a secondary index on the column.

cqlsh:demo> create INDEX on movies (title);
cqlsh:demo> select * from movies where title = 'City Hall (1996)';

Option c: Create a materialized view

cqlsh:demo> create MATERIALIZED VIEW demo.movies_by_title AS SELECT * from movies where title is not null primary key (title, movieid);

Materialized view creates another table with copy of the data from the base table (movies). Materialized view will be updated by cassandra each time you update base table. Materialized view does not allow aggregate operation yet.

cqlsh:demo> select * from movies_by_title where title = 'City Hall (1996)';
cqlsh:demo> select * from movies where movieid in (100);

Create ratings table.

cqlsh:demo> create table ratings(
    userid int, 
    movieid int, 
    rating float, 
    rated_on bigint,
    primary key ((userId), rated_on, movieId)
) with clustering order by (rated_on desc);

Load data ratings.csv into ratings table

cqlsh:demo> COPY ratings (userid, movieid, rating, rated_on) from '~/Downloads/datasets/ml-latest-small/ratings.csv' WITH HEADER = true;

Create a table using tags.csv dataset. I want see the unique tags that the users have associated with movie. Sort the tags by tag name.

cqlsh:demo> create table movie_by_tag(
    movieid int,
    tag text,
    userid int, 
    tagged_on bigint,
    primary key (movieId, tag)
);
cqlsh:demo> COPY movie_by_tag (userid, movieid, tag, tagged_on) from '~/Downloads/datasets/ml-latest-small/tags.csv' WITH HEADER = true;


Notice, the total number of records in the new table will be less than that in original dataset; Why is that?

cqlsh:demo> COPY movie_by_tag (userid, movieid, tag, tagged_on) from '~/Downloads/datasets/ml-latest-small/tags.csv' WITH HEADER = true;
Using 1 child processes

Starting copy of demo.movie_by_tag with columns [userid, movieid, tag, tagged_on].
Processed: 1296 rows; Rate:     756 rows/s; Avg. rate:    1353 rows/s
1296 rows imported from 1 files in 0.958 seconds (0 skipped).
cqlsh:demo> select count(*) from demo.movie_by_tag;

 count
-------
  1272

(1 rows)

Reason for this mismatch, there are multiple records for a given set of primary key values. Cassandra does not complain if you try to import more than one records with the same primary key. Below are some of the duplicate values.


movieId  tag                    Count
260      Science Fiction        4
1089     Quentin Tarantino      2
1089     organized crime        2
1258     cult film              2

Using Timestamp

Every cell (primary key and column combination) contains a primary key. Cassandra tables does not have a row locking mechanism. It uses write time to resolute state of the cell. Last write wins. You can observe the writetime of a cell in the table.

cqlsh:demo> create table user (id int primary key, name text);
cqlsh:demo> insert into user (id, name) values (1, 'user 1');
cqlsh:demo> select * from user;
 id | name
----+--------
  1 | user 1
cqlsh:demo> insert into user (id, name) values (2, 'user 2');
cqlsh:demo> select id, name, writetime(name) from user;
 id | name   | writetime(name)
----+--------+------------------
  1 | user 1 | 1499685866217511
  2 | user 2 | 1499685905696619

Write time is the number of milli seconds since 1970 Jan 1 0:00 hr. Timestamp for the name field shows the time when the field value was set. Timestamp is not available for the primary field column. You can use this site to convert long integer expression to date time, https://www.epochconverter.com/

You can optionally mention the write time of a cell.

cqlsh:demo> UPDATE user USING TIMESTAMP 1499685905696619 set name = 'user 4' where id = 2;
cqlsh:demo> select id, name, writetime(name) from user;
 id | name   | writetime(name)
----+--------+------------------
  1 | user 2 | 1499685866217511
  2 | user 4 | 1499685905696619

Observe that the value of the name column for user with 2 has been updated but the timestamp remained as is.

Now, let's write an update statement with timestamp earlier than the existing timestamp.

cqlsh:demo> UPDATE user USING TIMESTAMP 1499685905696618 set name = 'user 5' where id = 2;
cqlsh:demo> select id, name, writetime(name) from user;
 id | name   | writetime(name)
----+--------+------------------
  1 | user 2 | 1499685866217511
  2 | user 4 | 1499685905696619

As you can see because timestamp was set to a earlier point (1499685905696618) that existing (1499685905696619), the value was ignored.

Use of TTL (time to live)

Use ttl when you want to delete a record or a cell value after some time. A common use case is to apply TTL for session id or cached query result. Note, specify the ttl value in seconds - that indicate after how many seconds the row or cell will be deleted. Following example deletes the value in the name field after 60 seconds.

cqlsh:demo> UPDATE user USING TTL 60 SET name = 'user 10' where id = 2;
cqlsh:demo> select id, name, ttl(name) from user;
 id | name    | ttl(name)
----+---------+-----------
  1 |  user 2 |      null
  2 | user 10 |        51
(2 rows)

Run select after 60 secs.

cqlsh:demo> select id, name, ttl(name) from user;
 id | name   | ttl(name)
----+--------+-----------
  1 | user 2 |      null

Observe the record with id = 2 is automatically deleted.


You can set ttl at row level. Below one record is inserted with TTL = 10 secs. After 10 seconds the record will be automatically deleted.

cqlsh:demo> alter table user add age int;
cqlsh:demo> insert into user (id, name, age) values (1, 'user 1', 30) using ttl 10;


Lightweight transaction

The Paxos protocol is implemented in Cassandra with linearizable consistency, that is sequential consistency with real-time constraints. Linearizable consistency ensures transaction isolation at a level similar to the serializable level offered by RDBMSs. This type of transaction is known as compare and set (CAS); replica data is compared and any data found to be out of date is set to the most consistent value.

cqlsh:demo> UPDATE user SET name = 'user 10' where id = 1 if name = 'user 1' ;
 [applied] | name
-----------+--------
     False | user 2
     
cqlsh:demo> INSERT INTO user (id, name) VALUES (1, 'user 101') IF NOT EXISTS;


Complex data types

Cassandra support 5 complex data types - list, set, map, tuple, and user defined type (UDT).


List columns

Add new column emails that is a list type

cqlsh:demo> alter table user add emails list<text>;
cqlsh:demo> update user set emails = emails + ['user@email.com'] where id = 1;
cqlsh:demo> select * from user;
 id | emails             | name
----+--------------------+--------
  1 | ['user@email.com'] | user 2

Add a new email address to the user = 1

cqlsh:demo> update user set emails = emails + ['user2@gmail.com'] where id = 1;
cqlsh:demo> select * from user;
 id | emails                                | name
----+---------------------------------------+--------
  1 | ['user@email.com', 'user2@gmail.com'] | user 2

Delete an email address for the user = 1

cqlsh:demo> update user set emails = emails - ['user@email.com'] where id = 1;
cqlsh:demo> select * from user;
 id | emails              | name
----+---------------------+--------
  1 | ['user2@gmail.com'] | user 2


Custom Types

cqlsh:demo> CREATE TYPE Address (street text, city text, postal_code text, state text);

Add address field to the user table.

cqlsh:demo> alter table user add address Address;
cqlsh:demo> update user set address = {street: '100 main street', city: 'ny'} where id = 1;
cqlsh:demo> select * from user;
 id | address                                                                 | emails              | name
----+-------------------------------------------------------------------------+---------------------+--------
  1 | {street: '100 main street', city: 'ny', postal_code: null, state: null} | ['user2@gmail.com'] | user 2
(1 rows)
cqlsh:demo> update user set address.street = '100 market street' where id = 1;


UUID vs TimeUUID

Both UUID and TimeUUID are 128 bit value (16 bytes). First 64 bits in TimeUUID is replaced by timestamp. Timeuuid is sortable. UUID or TimeUUID can be created by uuid() and now() respectively. By selecting clustering order, most recent records come first.

cqlsh:demo> create table if not exists orders (
  user_id uuid, 
  id timeuuid,
  price double,
  tax_rate double,
  order_date date,
  status text,
  billing_address Address,
  shipping_address Address,
  primary key (user_id, id)
) with clustering order by (id desc);

Insert a record and view record.

cqlsh:demo> insert into orders (user_id, id, price, tax_rate, order_date, status, billing_address) values (uuid(), now(), 200, 0.18, toDate(now()), 'Submitted', {street: '100 main st', city: 'nyc'});
cqlsh:demo> select * from orders;

 user_id                              | id                                   | billing_address                                                      | order_date | price | shipping_address | status    | tax_rate
--------------------------------------+--------------------------------------+----------------------------------------------------------------------+------------+-------+------------------+-----------+----------
 b35fd35f-58a0-4f8c-a9a3-42052f9876fb | 1dbda110-53df-11e9-b4ea-97c098c1b9e6 | {street: '100 main st', city: 'nyc', postal_code: null, state: null} | 2019-03-31 |   200 |             null | Submitted |     0.18


Use user_if to filter the orders.

cqlsh:demo> select * from orders where user_id = b35fd35f-58a0-4f8c-a9a3-42052f9876fb;


You can extract the datetime and timestamp from the timeuuid.

cqlsh:demo> select id, unixTimestampOf(id), dateOf(id) from orders;

 id                                   | system.unixtimestampof(id) | system.dateof(id)
--------------------------------------+----------------------------+---------------------------------
 1dbda110-53df-11e9-b4ea-97c098c1b9e6 |              1554055335329 | 2019-03-31 18:02:15.329000+0000
  


Following are other available functions.

  • toDate(timeuuid)

Converts timeuuid to date in YYYY-MM-DD format.

  • toTimestamp(timeuuid)

Converts timeuuid to timestamp format.

  • toUnixTimestamp(timeuuid)


User Defined Functions

UDF support is disabled by default in Cassandra. You can enabled UDF by setting enable_user_defined_functions=true in cassandra.yaml file and restart the cassandra node and cqlsh.

User defined functions allow you to create simple functions that can be used in

CREATE FUNCTION IF NOT EXISTS with_tax_amount (amount double, tax_rate double) 
   CALLED ON NULL INPUT 
   RETURNS double 
   LANGUAGE java AS '
     return amount * (1 + tax_rate);
     ';

cqlsh:demo> select price, tax_rate, with_tax_amount(price, tax_rate) from orders;

price | tax_rate | demo.with_tax_amount(price, tax_rate)
-------+----------+---------------------------------------
200 |     0.18 |                                   236
200 |     0.18 |                                   236


Aggregate functions

You can perform partition level aggregate functions like sum, max, min, avg.

Let's insert a few more record for the same partition (e.g. for the same user_id)

cqlsh:demo> insert into orders (user_id, id, price, tax_rate) values (b35fd35f-58a0-4f8c-a9a3-42052f9876fb, now(), 300, 0.18);
cqlsh:demo> insert into orders (user_id, id, price, tax_rate) values (b35fd35f-58a0-4f8c-a9a3-42052f9876fb, now(), 400, 0.18);


By this time we should have 3 records.

cqlsh:demo> select id,price,tax_rate  from orders where user_id = b35fd35f-58a0-4f8c-a9a3-42052f9876fb;
 id                                   | price | tax_rate
--------------------------------------+-------+----------
 c6650ca0-53e8-11e9-aefd-afa2a8d20ca2 |   400 |     0.18
 c1cb6040-53e8-11e9-aefd-afa2a8d20ca2 |   300 |     0.18
 1dbda110-53df-11e9-b4ea-97c098c1b9e6 |   200 |     0.18


Find out min, max, avg, sum of the price for a given user.

cqlsh:demo> select sum(price),min(price),max(price),avg(price),count(*) from orders where user_id = b35fd35f-58a0-4f8c-a9a3-42052f9876fb;

 system.sum(price) | system.min(price) | system.max(price) | system.avg(price) | count
-------------------+-------------------+-------------------+-------------------+-------
               900 |               200 |               400 |               300 |     3




Deleting Records from Cassandra table

Cassandra treats a delete as an insert or upsert. The data being added to the partition in the DELETE command is a deletion marker called a tombstone. The tombstones go through Cassandra's write path, and are written to SSTables on one or more nodes. The key difference feature of a tombstone: it has a built-in expiration date/time. At the end of its expiration period the tombstone is deleted as part of Cassandra's normal compaction process.

You can run delete statement by

A. delete record by primary key

B. delete record by partition key

C. delete a record or column by using TTL

D. delete records older than a certain timestamp

E. dropping keyspace and table immediately performs the delete without tombstone or GC grace period.