Query data using CQL with a dynamic LIMIT

Query data using CQL with a dynamic LIMIT

| October 26, 2014 | 0 Comments

Apache Cassandra, a top level Apache project born at Facebook and built on Amazon’s Dynamo and Google’s BigTable, is a distributed storage system for managing very large amounts of structured data spread out across many commodity servers, while providing highly available service with no single point of failure. Cassandra aims to run on top of an infrastructure of hundreds of nodes (possibly spread across different data centers in multiple geographic areas).  Working with database distributed storage systems like Cassandra suppose  a huge challenge for developers because it needs a complete change on the way of design your data model. With distributed databases you have to consider aspects like the replication factor of your data, forget about have a traditional backup of all your database, number of nodes needed to complete a read operation, etc…

At the beginning stages of Cassandra, the developers could execute queries against their clusters using the thrift API (called in tha way because it is based on the Apache Thrift Project). At some point CQL appear, a query language with great similarities with SQL to make the life of the developers easier when they have to work with Cassandra. With the cassandra version 1.2, a new protocol was released to face some of the limitations of the thrift interface and the specification 3 of CQL (the last one until now) was supported.

One of the first limitation that every cassandra developer face on their initial steps is the implicit limit on the number of rows you can retrieve from a cql select query.

Why my select query does not return all the results?

Imagine we are developing a software which allow the user to visualize the genetic marker differences between two individuals of a particular crop. For example we have the genetic information of a certain variety of tomato which have a normal behaviour against cold temperatures. On the other hand, we also have the genetic information of another variety of tomato  which is especially resistant to cold temperatures and can be breed in more extreme conditions. With bioinformatics processes we can obtain a list of all the genetic marker differences between both species and use that information to identify the gene which make possible the resistant to extreme temperatures.

This is a very simplified representation of a normal use case in biotechonology, but the main point here is that the difference between the two individuals could be formed by billions of elements and each element has their own parameter values. This differences are stored in Cassandra in a data model like these:

1
2
3
4
5
6
7
8
9
CREATE TABLE markerlist (id uuid,
idindividual1 VARCHAR,
idindividual2 VARCHAR,
parameter1 FLOAT,
parameter2 FLOAT,
parameter3 FLOAT,
paremeter4 FLOAT,
PRIMARY KEY((idindividual1,idindividual2),id)
);

The partition key is formed by the two individuals because we are going to query always by a pair of individuals. Because there are many differences between two specific individuals we add an id to the clustering key so each row represent a single and unique difference.

In order to simplify the example I only include 4 parameters for each marker  but could be a lot more. Because of the cardinality of the parameters we can not create secondary indexes on all the parameters, so we perform the filtering on the parameter values in our business logic. In this situation the query we want to execute is:

1
SELECT * FROM markerlist WHERE idindividual1 = 'normalTomato' AND idindividual2 = 'resistantTomato';

This query should give to us more than 1 million of rows for example according to the data we previously inserted on the column family. But, what happen if we executed this query using the cql shell. The output show only 10000 results, how is that possible?

Query using the shell

Looking the documentation of the select cql clause, we can see the next sentence:

The LIMIT option to a SELECT expression limits the number of rows returned by a query. LIMIT defaults to 10,000 when left unset.

We can increase the limit, for example like this:

1
2
SELECT * FROM markerlist WHERE idindividual1 = 'normaltomato' AND idindividual2 = 'resistanttomato' 
LIMIT 1000000;

Then the cql shell get unresponsive for a long time because the query need to get a lot more rows. At some point, we can get a time out on the query or even getting the results back the general performance of the column family is affected.

Then we tried to execute the same query inside our application using the datastax java driver:

1
2
3
4
SimpleConnect connection = new SimpleConnect();
        Session session = connection.connect("localhost");
 
        ResultSet rs = session.execute("SELECT * from markerlist WHERE idindividual1 = 'normaltomato' AND idindividual2 = 'resistanttomato' LIMIT 1000000;");

On this case we get a timeout executing our query in the best of the cases or even a big penalty in performance terms in our query.

What is the cause?

The cause is that queries which retrieves that amount of data in cassandra are not recommended, some people consider this an anti pattern over a cassandra data model. But, there will be situations in which you need to solve this kind of uses cases and the cost of using a different technology ( a different database in this case) is bigger than looking for some solution that could work with your existing database.

A possible solution…

The obvious solution is look for a way to obtain the results in batches of a reduced number of rows. It is possible to use the token to do this, but you need to know beforehand an indication about the rows you are trying to get from the database.

The solution we take is add a new variable to the clustering key called partid:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS markerlistwithpartition (id uuid,
partid INT,
idindividual1 VARCHAR,
idindividual2 VARCHAR,
parameter1 FLOAT,
parameter2 FLOAT,
parameter3 FLOAT,
paremeter4 FLOAT,
PRIMARY KEY((idindividual1,idindividual2),partid,id)
);

Together with the new component in the key we have to maintain a registry of the number of partitions and the total number of markers available:

1
2
3
4
5
CREATE TABLE IF NOT EXISTS markerliststats(id uuid,
numbermarkers BIGINT,
numberpartitions BIGINT,
PRIMARY KEY(id)irst
);

By this way if we want to get all the markers between the normaltomato and the resistanttomato as we did previously, we have lo query first the number of partitions available:

1
SELECT numberpartitions FROM markerliststats WHERE id=756716f7-2e54-4715-9f00-91dcbea6cf50;
NOTE: the id used is only an example, in our cases we have our data logically separated by client and we use the id of the client as primary for the column families which contains the partitions.

And the for each partition, query the number of markers presented in that partition:

1
2
3
4
5
6
7
SELECT * FROM markerlist WHERE idindividual1 = 'normaltomato' AND idindividual2 = 'resistanttomato' AND partid=0;
 
SELECT * FROM markerlist WHERE idindividual1 = 'normaltomato' AND idindividual2 = 'resistanttomato' AND partid=1; .
 
...
 
SELECT * FROM markerlist WHERE idindividual1 = 'normaltomato' AND idindividual2 = 'resistanttomato' AND partid=200;

This means that if we set a size for our partition of 10000 markers, to retrieve 2 million of markers we will need 200 queries. If we parallelize the execution of some of these queries, the performance will be improved a lot.

Conclusion

The conclusion here is that it is always better to query small pieces of data than a biggest one. The architecture of cassandra makes possible to execute a lot of queries at the same time with a very low latency. For this reason, we create our own manual partitions system for the queries that retrieve a lot of rows.

Instead of executing a single expensive query which retrieve million of rows, we execute hundreds of queries which retrieves thousands of rows and we combine the results in our application. Using a cluster with three nodes and a replication factor of 1, here are the metrics I get from the microtest covered in this blog post:

Query WITHOUT manual partitions

Query WITH manual partitions

Result  < 10000 rows   3ms  3.4 ms
Result > 10000 rowsResult < 1 Million rows 12 min 6 min
Result > 1 Million rowsResult < 2 Million rows 25 min 8 min
NOTE: the result of this test are the average of the results obtained under a repetition of 100 times the both queries systems.

As you can see in the table results, query a number of rows which is less than double the size of the partitions does not suppose a big improvement in the use of the manual partitions system. But if you need to query million of results, the differences in time performance between both methods are bigger than 100%. The counter part of this is you need to add to your code an extra logic to support this manual partition system. I will present in a future post the solution we have been using in our spring applications.

———————————————————–

References:

http://stackoverflow.com/questions/22792260/does-cassandra-read-the-whole-row-when-limiting-the-number-of-requested-results

http://www.wentnet.com/blog/?p=24

http://www.datastax.com/documentation/cql/3.0/cql/cql_reference/select_r.html0