days
-7
-1
hours
-2
-2
minutes
-1
-6
seconds
-1
-8
search
Speculative queries, tangible benefits

How to perform speculative query executions in Apache Cassandra using GoCql

Alex Lourie
Apache Cassandra
© Shutterstock / Yury Zap

Dealing with response times in Apache Cassandra can be tricky. In this article, Instaclustr’s Alex Lourie looks at speculative queries and three use cases where the potential benefits are hard to ignore.

When certain issues with Cassandra databases arise – such as faulty, slow, or unresponsive nodes, or network interruptions – the ability to use speculative query executions offers a compelling solution. Speculative query execution enables a client to make a database request from multiple endpoints simultaneously, and allows those requests to compete for the fastest response time.

The purpose of speculative query execution is most often not to improve performance (although it may deliver this result by streamlining execution time using responses from the fastest node), but instead to better ensure that queries do in fact reliably receive a server response. It’s also important to recognize that any reliability or performance improvements due to speculative queries don’t come for free, but require increased CPU and network resource usage to achieve.

Before we dive in, let’s have a quick look at some use cases.

Use cases

Here are three use cases where speculative queries will offer you particularly compelling benefits:

  1. You’re querying a node that’s down, but obligated by your SLA to deliver a server response sooner than the Cassandra timeout.
  2. An unreliable node is dropping queries or providing inconsistent response times.
  3. A node returns timeout errors such that a client application needs to retry the query on a different node.

One note here: this technique only functions when the query is defined as idempotent, so that it will render the same result when run repeatedly. A guide covering the idempotence aspect of using speculative query execution is available here.

SEE ALSO: How well do you know your Apache Cassandra trivia?

Time to check out some examples

The below example demonstrates defining a query as idempotent. Note that this also uses a SimpleSpeculativeExecution policy, implemented in the GoCql driver, to define the number of executions in addition to the original request, and the constant delay between each execution:

  ...
    cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3")
    sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)

The following example shows how to create a policy that pauses incrementally longer between each additional execution:

type IncreasingSpeculativeExecution struct {
 NumAttempts  int
 TimeoutDelay time.Duration
}
 
func (sp *IncreasingSpeculativeExecution) Attempts() int        { return sp.NumAttempts }
func (sp *IncreasingSpeculativeExecution) Delay() time.Duration {
    sp.TimeoutDelay += 50 * time.Millisecond
    return sp.TimeoutDelay
}

You can then use that policy in the query execution:

    ...
    cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3")
    sp := &IncreasingSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)
    …..

To utilize speculative query executions, the following example uses a three-node Cassandra cluster with a slow node, which can be simulated using a tc tool available with the iproute2 package. The example is somewhat extreme, but serves to illustrate how speculative queries can provide value.

First, simulate a slow node by running this command on one of the nodes, which will add a 250ms delay to outbound packets for the eth0 physical device:

sudo tc qdisc add dev eth0 root netem delay 250ms

Then, use the below client code to run the test queries. This code inserts 10,000 entries into the cluster, and uses random numbers in the key column (id) to distribute them close to evenly among the nodes. You can also find this code here.

</pre>
/* Before you execute the program, Launch `cqlsh` and execute:
create keyspace example with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
create table example.tweet(timeline text, id UUID, text int, PRIMARY KEY(id));
create index on example.tweet(timeline);
*/
package main

import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"time"

"github.com/gocql/gocql"
)

type hostMetrics struct {
attempts int
latency int
}

// The observer type to watch the queries data
type testQueryObserver struct {
metrics map[string]*hostMetrics
verbose bool
}

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) {
host := q.Host.ConnectAddress().String()
curMetric := o.metrics[host]
curAttempts := 0
curLatency := 0
if curMetric != nil {
curAttempts = curMetric.attempts
curLatency = curMetric.latency
}
if q.Err == nil {
o.metrics[host] = &hostMetrics{attempts: q.Metrics.Attempts + curAttempts, latency: curLatency + int(q.Metrics.TotalLatency/1000000)}
}
if o.verbose {
fmt.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)
}
}

func (o *testQueryObserver) GetMetrics() {
for h, m := range o.metrics {
fmt.Printf("Host: %s, Attempts: %v, Avg Latency: %vms\n", h, m.attempts, m.latency/m.attempts)
}
}

// Simple retry policy for attempting the connection to 1 host only per query
type RT struct {
num int
}

func (rt *RT) Attempt(q gocql.RetryableQuery) bool {
return q.Attempts() <= rt.num
}

func (rt *RT) GetRetryType(err error) gocql.RetryType {
return gocql.Rethrow
}

func main() {

specExec := flag.Bool("specExec", false, "Speculative execution")
flag.Parse()

// the number of entries to insert
cycles := 10000

// connect to the cluster
cluster := gocql.NewCluster("...")
cluster.Keyspace = "example"

// the timeout of one of the nodes is very high, so let’s make sure we wait long enough
cluster.Timeout = 10 * time.Second
cluster.RetryPolicy = &RT{num: 3}
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()

observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false}
rand.Seed(time.Now().UnixNano())
for i := 0; i < cycles; i = i + 1 {
r := rand.Intn(10000)
u, _ := gocql.RandomUUID()
query := session.Query(`INSERT INTO example.tweet (id, timeline, data) VALUES (?, 'me', ?)`, u, r).Observer(observer)
// Create speculative execution policy with the timeout delay between following executions set to 10ms
sp := &gocql.SimpleSpeculativeExecution{NumAttempts: 2, TimeoutDelay: 10 * time.Millisecond}
// Specifically set Idempotence to either true or false to constrol normal/speculative execution
query.SetSpeculativeExecutionPolicy(sp).Idempotent(*specExec)
query.Exec()
}

// wait a sec before everything finishes
<-time.After(1 * time.Second)

// Print results
fmt.Println("\n==========\n")
observer.GetMetrics()
}
<pre>

Next, start the cluster and execute the client. Notice the latency:

admin@ip-10-0-17-222:~/go$ time go run spectest.go
 
==========
 
Host1: <ip>, Attempts: 3334, Avg Latency: 502ms
Host2: <ip> , Attempts: 3333, Avg Latency: 2ms
Host3: <ip>, Attempts: 3333, Avg Latency: 2ms
 
real 28m21.859s
user 0m2.920s
sys 0m1.828s

Because one node has a consistent delay of just over half a second, it takes nearly half an hour to run the queries. In comparison, using speculative execution offers the following results:

admin@ip-10-0-17-222:~/go$ time go run spectest.go --specExec
 
==========
 
Host2: <ip>, Attempts: 5000, Avg Latency: 1ms
Host3: <ip>, Attempts: 4999, Avg Latency: 2ms
 
real 1m24.493s
user 0m3.900s
sys 0m3.072s

Here the impaired node isn’t shown as responding to the query. Instead, queries are divided between the two more effective nodes, resulting in a completion time of about a minute and a half.

Now let’s look at the overhead cost of realizing this faster execution:

Some concluding thoughts

While the resource costs were relatively favorable in this test case – which used a single simple client on a performant host – your use case may involve much more complexity and greater impact on resources. For this reason, it’s recommended you test speculative queries in your pre-production environment and fully assess resource costs before proceeding with deployment in production.

The feature for performing speculative query executions using GoCql has been well supported by contributors in the community, and is currently ready for general use.

Author
apache cassandra

Alex Lourie

Alex Lourie is a Senior Software Engineer at Instaclustr, which provides a managed service platform of open source technologies such as Apache Cassandra, Apache Spark, Elasticsearch and Apache Kafka. Find Alex on Twitter, @alourie.


Leave a Reply

Be the First to Comment!

avatar
400
  Subscribe  
Notify of