Scylla Rust Driver

This book contains documentation for scylla-rust-driver - a driver for the Scylla database written in Rust. Although optimized for Scylla, the driver is also compatible with Apache Cassandra®.

Other documentation

Contents

Quick Start

In this chapter we will set up a Rust project and run a few simple queries.

Topics Include:

Creating a project

To create a new project run:

cargo new myproject

In Cargo.toml add useful dependencies:

[dependencies]
scylla = "0.3.1"
tokio = { version = "1.1.0", features = ["full"] }
futures = "0.3.6"
uuid = "0.8.1"
bigdecimal = "0.2.0"
num-bigint = "0.3"
tracing = "0.1.25"
tracing-subscriber = "0.2.16"

In main.rs put:

extern crate scylla;
extern crate tokio;
use scylla::Session;

#[tokio::main]
async fn main() {
    println!("Hello scylla!");
}

Now running cargo run should print:

Hello scylla!

Running Scylla using Docker

To make queries we will need a running Scylla instance. The easiest way is to use a Docker image.
Please install Docker if it's not installed.

Running scylla

To start Scylla run:

# on Linux sudo might be required
docker run --rm -it -p 9042:9042 scylladb/scylla --smp 2

Docker will download the image, then after minute or two there should be a message like:

Starting listening for CQL clients on 172.17.0.2:9042

This means that Scylla is ready to receive queries

To stop this instance press Ctrl + C

More information

More information about this image can be found on dockerhub

Connecting and running a simple query

Now everything is ready to use the driver. Here is a small example:

extern crate scylla;
extern crate tokio;
use scylla::{IntoTypedRows, Session, SessionBuilder};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Create a new Session which connects to node at 127.0.0.1:9042
    // (or SCYLLA_URI if specified)
    let uri = std::env::var("SCYLLA_URI")
        .unwrap_or_else(|_| "127.0.0.1:9042".to_string());

    let session: Session = SessionBuilder::new()
        .known_node(uri)
        .build()
        .await?;

    // Create an example keyspace and table
    session
        .query(
            "CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = \
            {'class' : 'SimpleStrategy', 'replication_factor' : 1}",
            &[],
        )
        .await?;

    session
        .query(
            "CREATE TABLE IF NOT EXISTS ks.extab (a int primary key)",
            &[],
        )
        .await?;

    // Insert a value into the table
    let to_insert: i32 = 12345;
    session
        .query("INSERT INTO ks.extab (a) VALUES(?)", (to_insert,))
        .await?;

    // Query rows from the table and print them
    if let Some(rows) = session.query("SELECT a FROM ks.extab", &[]).await?.rows {
        // Parse each row as a tuple containing single i32
        for row in rows.into_typed::<(i32,)>() {
            let read_row: (i32,) = row?;
            println!("Read a value from row: {}", read_row.0);
        }
    }

    Ok(())
}

Connecting to the cluster

Scylla is a distributed database, which means that it operates on multiple nodes running independently. When creating a Session you can specify a few known nodes to which the driver will try connecting:

extern crate scylla;
extern crate tokio;
use scylla::{Session, SessionBuilder};
use std::error::Error;
use std::time::Duration;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let uri = std::env::var("SCYLLA_URI")
        .unwrap_or_else(|_| "127.0.0.1:9042".to_string());

    let session: Session = SessionBuilder::new()
        .known_node(uri)
        .known_node("127.0.0.72:4321")
        .known_node("localhost:8000")
        .connection_timeout(Duration::from_secs(3))
        .known_node_addr(SocketAddr::new(
            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
            9000,
        ))
        .build()
        .await?;

    Ok(())
}

After succesfully connecting to some specified node the driver will fetch topology information about other nodes in this cluster and connect to them as well.

Compression

By default the driver does not use any compression on connections.
It's possible to specify a preferred compression algorithm.
The driver will try using it, but if the database doesn't support it, it will fall back to no compression.

Available compression algorithms:

  • Snappy
  • LZ4

An example enabling Snappy compression algorithm:

extern crate scylla;
extern crate tokio;
use scylla::{Session, SessionBuilder};
use scylla::transport::Compression;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let uri = std::env::var("SCYLLA_URI")
        .unwrap_or_else(|_| "127.0.0.1:9042".to_string());

    let session: Session = SessionBuilder::new()
        .known_node(uri)
        .compression(Some(Compression::Snappy))
        .build()
        .await?;

    Ok(())
}

Authentication

Driver supports authentication by username and password.

To specify credentials use the user method in SessionBuilder:


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate tokio;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .user("myusername", "mypassword")
    .build()
    .await?;

Ok(())
}
}

TLS

Driver uses the openssl crate for TLS functionality.
It was chosen because rustls doesn't support certificates for ip addresses (see issue), which is a common use case for Scylla.

Enabling feature

openssl is not a pure Rust library so you need enable a feature and install the proper package.

To enable the tls feature add in Cargo.toml:

scylla = { version = "0.3.1", features = ["ssl"] }
openssl = "0.10.32"

Then install the package with openssl:

  • Debian/Ubuntu:
    apt install libssl-dev pkg-config
    
  • Fedora:
    dnf install openssl-devel
    
  • Arch:
    pacman -S openssl pkg-config
    

Using TLS

To use tls you will have to create an openssl SslContext and pass it to SessionBuilder

For example, if database certificate is in the file ca.crt:


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate openssl;
use scylla::{Session, SessionBuilder};
use openssl::ssl::{SslContextBuilder, SslMethod, SslVerifyMode};
use std::path::PathBuf;

use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
context_builder.set_ca_file("ca.crt")?;
context_builder.set_verify(SslVerifyMode::PEER);

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9142") // The the port is now 9142
    .ssl_context(Some(context_builder.build()))
    .build()
    .await?;

Ok(())
}
}

See the full example for more details

Making queries

This driver supports all query types available in Scylla:

  • Simple queries
    • Easy to use
    • Poor performance
    • Primitve load balancing
  • Prepared queries
    • Need to be prepared before use
    • Fast
    • Properly load balanced
  • Batch statements
    • Run multiple queries at once
    • Can be prepared for better performance and load balancing
  • Paged queries
    • Allows to read result in multiple pages when it doesn't fit in a single response
    • Can be prepared for better performance and load balancing

Additionaly there is special functionality to enable USE KEYSPACE queries: USE keyspace

Queries are fully asynchronous - you can run as many of them in parallel as you wish.

Simple query

Simple query takes query text and values and simply executes them on a Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn simple_query_example(session: &Session) -> Result<(), Box<dyn Error>> {
// Insert a value into the table
let to_insert: i32 = 12345;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;
Ok(())
}
}

Warning
Don't use simple query to receive large amounts of data.
By default the query is unpaged and might cause heavy load on the cluster.
In such cases set a page size and use paged query instead.

When page size is set, query will return only the first page of results.

First argument - the query

As the first argument Session::query takes anything implementing Into<Query>.
You can create a query manually to set custom options. For example to change query consistency:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::statement::Consistency;

// Create a Query manually to change the Consistency to ONE
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
my_query.set_consistency(Consistency::One);

// Insert a value into the table
let to_insert: i32 = 12345;
session.query(my_query, (to_insert,)).await?;
Ok(())
}
}

See Query API documentation for more options

Second argument - the values

Query text is constant, but the values might change. You can pass changing values to a query by specifying a list of variables as bound values.
Each ? in query text will be filled with the matching value.

The easiest way is to pass values using a tuple:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
// Sending an integer and a string using a tuple
session
    .query("INSERT INTO ks.tab (a, b, c) VALUES(?, ?, 'text2')", (2_i32, "Some text"))
    .await?;
Ok(())
}
}

Here the first ? will be filled with 2 and the second with "Some text".

Never pass values by adding strings, this could lead to SQL Injection

See Query values for more information about sending values in queries

Query result

Session::query returns QueryResult with rows represented as Option<Vec<Row>>.
Each row can be parsed as a tuple of rust types using into_typed:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Query rows from the table and print them
if let Some(rows) = session.query("SELECT a FROM ks.tab", &[]).await?.rows {
    // Parse each row as a tuple containing single i32
    for row in rows.into_typed::<(i32,)>() {
        let read_row: (i32,) = row?;
        println!("Read a value from row: {}", read_row.0);
    }
}
Ok(())
}
}

In cases where page size is set, simple query returns only a single page of results.
To receive all pages use a paged query instead.

See Query result for more information about handling query results

Performance

Simple queries should not be used in places where performance matters.
If perfomance matters use a Prepared query instead.

With simple query the database has to parse query text each time it's executed, which worsens performance.

Additionaly token and shard aware load balancing does not work with simple queries. They are sent to random nodes.

Query values

Query text is constant, but the values might change. You can pass changing values to a query by specifying a list of variables as bound values.
Each ? in query text will be filled with the matching value.

Never pass values by adding strings, this could lead to SQL Injection

Each list of values to send in a query must implement the trait ValueList.
By default this can be a slice &[], a tuple () (max 16 elements) of values to send, or a custom struct which derives from ValueList.

A few examples:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::{Session, ValueList};
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
// Empty slice means that there are no values to send
session.query("INSERT INTO ks.tab (a) VALUES(1)", &[]).await?;

// Empty tuple/unit also means that there are no values to send
session.query("INSERT INTO ks.tab (a) VALUES(1)", ()).await?;

// Sending three integers using a slice:
session
    .query("INSERT INTO ks.tab (a, b, c) VALUES(?, ?, ?)", [1_i32, 2, 3].as_ref())
    .await?;

// Sending an integer and a string using a tuple
session
    .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", (2_i32, "Some text"))
    .await?;

// Sending an integer and a string using a named struct.
// The values will be passed in the order from the struct definition
#[derive(ValueList)]
struct IntString {
    first_col: i32,
    second_col: String,
}

let int_string = IntString {
    first_col: 42_i32,
    second_col: "hello".to_owned(),
};

session
    .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", int_string)
    .await?;

// Sending a single value as a tuple requires a trailing coma (Rust syntax):
session.query("INSERT INTO ks.tab (a) VALUES(?)", (2_i32,)).await?;

// Each value can also be sent using a reference:
session
    .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", &(&2_i32, &"Some text"))
    .await?;
Ok(())
}
}

NULL values

Null values can be sent using Option<> - sending a None will make the value NULL:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
let null_i32: Option<i32> = None;
session
    .query("INSERT INTO ks.tab (a) VALUES(?)", (null_i32,))
    .await?;
Ok(())
}
}

Unset values

When performing an insert with values which might be NULL, it's better to use Unset.
Database treats inserting NULL as a delete operation and will generate a tombstone. Using Unset results in better performance:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::frame::value::{MaybeUnset, Unset};

// Inserting a null results in suboptimal performance
let null_i32: Option<i32> = None;
session
    .query("INSERT INTO ks.tab (a) VALUES(?)", (null_i32,))
    .await?;

// Using MaybeUnset enum is better
let unset_i32: MaybeUnset<i32> = MaybeUnset::Unset;
session
    .query("INSERT INTO ks.tab (a) VALUES(?)", (unset_i32,))
    .await?;

// If we are sure that a value should be unset we can simply use Unset
session
    .query("INSERT INTO ks.tab (a) VALUES(?)", (Unset,))
    .await?;
Ok(())
}
}

See the issue for more information about Unset

Other data types

See Data Types for instructions on sending other data types

Query result

Session::query and Session::execute return a QueryResult with rows represented as Option<Vec<Row>>.

Basic representation

Row is a basic representation of a received row. It can be used by itself, but it's a bit awkward to use:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows {
    for row in rows {
        let int_value: i32 = row.columns[0].as_ref().unwrap().as_int().unwrap();
    }
}
Ok(())
}
}

Parsing using into_typed

The driver provides a way to parse a row as a tuple of Rust types:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Parse row as a single column containing an int value
if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows {
    for row in rows {
        let (int_value,): (i32,) = row.into_typed::<(i32,)>()?;
    }
}

// rows.into_typed() converts a Vec of Rows to an iterator of parsing results
if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows {
    for row in rows.into_typed::<(i32,)>() {
        let (int_value,): (i32,) = row?;
    }
}

// Parse row as two columns containing an int and text columns
if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows {
    for row in rows.into_typed::<(i32, String)>() {
        let (int_value, text_value): (i32, String) = row?;
    }
}
Ok(())
}
}

NULL values

NULL values will return an error when parsed as a Rust type. To properly handle NULL values parse column as an Option<>:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Parse row as two columns containing an int and text which might be null
if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows {
    for row in rows.into_typed::<(i32, Option<String>)>() {
        let (int_value, str_or_null): (i32, Option<String>) = row?;
    }
}
Ok(())
}
}

Parsing row as a custom struct

It is possible to receive row as a struct with fields matching the columns.
The struct must:

  • have the same number of fields as the number of queried columns
  • have field types matching the columns being received
  • derive FromRow

Field names don't need to match column names.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::macros::FromRow;
use scylla::frame::response::cql_to_rust::FromRow;

#[derive(FromRow)]
struct MyRow {
    age: i32,
    name: Option<String>
}

// Parse row as two columns containing an int and text which might be null
if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows {
    for row in rows.into_typed::<MyRow>() {
        let my_row: MyRow = row?;
    }
}
Ok(())
}
}

Other data types

For parsing other data types see Data Types

Prepared query

Prepared queries provide much better performance than simple queries, but they need to be prepared before use.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;

// Prepare the query for later execution
let prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.tab (a) VALUES(?)")
    .await?;

// Run the prepared query with some values, just like a simple query
let to_insert: i32 = 12345;
session.execute(&prepared, (to_insert,)).await?;
Ok(())
}
}

Warning
For token/shard aware load balancing to work properly, all partition key values must be sent as bound values (see performance section)

Warning
Don't use execute to receive large amounts of data.
By default the query is unpaged and might cause heavy load on the cluster. In such cases set a page size and use a paged query instead.

When page size is set, execute will return only the first page of results.

Session::prepare

Session::prepare takes query text and prepares the query on all nodes and shards. If at least one succeds returns success.

Session::execute

Session::execute takes a prepared query and bound values and runs the query. Passing values and the result is the same as in simple query.

Query options

To specify custom options, set them on the PreparedStatement before execution. For example to change the consistency:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::statement::Consistency;

// Prepare the query for later execution
let mut prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.tab (a) VALUES(?)")
    .await?;

// Set prepared query consistency to One
// This is the consistency with which this query will be executed
prepared.set_consistency(Consistency::One);

// Run the prepared query with some values, just like a simple query
let to_insert: i32 = 12345;
session.execute(&prepared, (to_insert,)).await?;
Ok(())
}
}

See PreparedStatement API documentation for more options.

Note Prepared statements can be created from Query structs and will inherit from the custom options that the Query was created with. This is especially useful when using CachingSession::execute for example.

Performance

Prepared queries have good performance, much better than simple queries. By default they use shard/token aware load balancing.

Always pass partition key values as bound values. Otherwise the driver can't hash them to compute partition key and they will be sent to the wrong node, which worsens performance.

Let's say we have a table like this:

TABLE ks.prepare_table (
    a int,
    b int,
    c int,
    PRIMARY KEY (a, b)
)

#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;

// WRONG - partition key value is passed in query string
// Load balancing will compute the wrong partition key
let wrong_prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.prepare_table (a, b, c) VALUES(12345, ?, 16)")
    .await?;

session.execute(&wrong_prepared, (54321,)).await?;

// GOOD - partition key values are sent as bound values
// Other values can be sent any way you like, it doesn't matter
let good_prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.prepare_table (a, b, c) VALUES(?, ?, 16)")
    .await?;

session.execute(&good_prepared, (12345, 54321)).await?;

Ok(())
}
}

Batch statement

A batch statement allows to run many queries at once.
These queries can be simple queries or prepared queries.
Only queries like INSERT or UPDATE can be in a batch, batch doesn't return any rows.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::batch::Batch;
use scylla::query::Query;
use scylla::prepared_statement::PreparedStatement;

// Create a batch statement
let mut batch: Batch = Default::default();

// Add a simple query to the batch using its text
batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");

// Add a simple query created manually to the batch
let simple: Query = Query::new("INSERT INTO ks.tab (a, b) VALUES(3, 4)");
batch.append_statement(simple);

// Add a prepared query to the batch
let prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.tab (a, b) VALUES(?, 6)")
    .await?;
batch.append_statement(prepared);

// Specify bound values to use with each query
let batch_values = ((1_i32, 2_i32),
                    (),
                    (5_i32,));

// Run the batch, doesn't return any rows
session.batch(&batch, batch_values).await?;
Ok(())
}
}

Batch options

You can set various options by operating on the Batch object.
For example to change consistency:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::batch::Batch;
use scylla::statement::Consistency;

// Create a batch
let mut batch: Batch = Default::default();
batch.append_statement("INSERT INTO ks.tab(a) VALUES(16)");

// Set batch consistency to One
batch.set_consistency(Consistency::One);

// Run the batch
session.batch(&batch, ((), )).await?;
Ok(())
}
}

See Batch API documentation for more options

Batch values

Batch takes a tuple of values specified just like in simple or prepared queries.

Length of batch values must be equal to the number of statements in a batch.
Each query must have its values specified, even if they are empty.

Values passed to Session::batch must implement the trait BatchValues.
By default this includes tuples () and slices &[] of tuples and slices which implement ValueList.

Example:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::batch::Batch;

let mut batch: Batch = Default::default();

// A query with two bound values
batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");

// A query with one bound value
batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)");

// A query with no bound values
batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)");

// Batch values is a tuple of 3 tuples containing values for each query
let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first query
                    (4_i32,),       // Tuple with one value for the second query
                    ());            // Empty tuple/unit for the third query

// Run the batch
session.batch(&batch, batch_values).await?;
Ok(())
}
}

For more information about sending values in a query see Query values

Performance

Batch statements do not use token/shard aware load balancing, batches are sent to a random node.

Use prepared queries for best performance

Paged query

Sometimes query results might not fit in a single page. Paged queries allow to receive the whole result page by page.

Session::query_iter and Session::execute_iter take a simple query or a prepared query and return an async iterator over result Rows.

Examples

Use query_iter to perform a simple query with paging:


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate futures;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use futures::stream::StreamExt;

let mut rows_stream = session
    .query_iter("SELECT a, b FROM ks.t", &[])
    .await?
    .into_typed::<(i32, i32)>();

while let Some(next_row_res) = rows_stream.next().await {
    let (a, b): (i32, i32) = next_row_res?;
    println!("a, b: {}, {}", a, b);
}
Ok(())
}
}

Use execute_iter to perform a prepared query with paging:


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate futures;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::prepared_statement::PreparedStatement;
use futures::stream::StreamExt;

let prepared: PreparedStatement = session
    .prepare("SELECT a, b FROM ks.t")
    .await?;

let mut rows_stream = session
    .execute_iter(prepared, &[])
    .await?
    .into_typed::<(i32, i32)>();

while let Some(next_row_res) = rows_stream.next().await {
    let (a, b): (i32, i32) = next_row_res?;
    println!("a, b: {}, {}", a, b);
}
Ok(())
}
}

Query values can be passed to query_iter and execute_iter just like in a simple query

Configuring page size

It's possible to configure the size of a single page.

On a Query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;

let mut query: Query = Query::new("SELECT a, b FROM ks.t");
query.set_page_size(16);

let _ = session.query_iter(query, &[]).await?; // ...
Ok(())
}
}

On a PreparedStatement:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;

let mut prepared: PreparedStatement = session
    .prepare("SELECT a, b FROM ks.t")
    .await?;

prepared.set_page_size(16);

let _ = session.execute_iter(prepared, &[]).await?; // ...
Ok(())
}
}

Passing the paging state manually

It's possible to fetch a single page from the table, extract the paging state from the result and manually pass it to the next query. That way, the next query will start fetching the results from where the previous one left off.

On a Query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;

let paged_query = Query::new("SELECT a, b, c FROM ks.t").with_page_size(6);
let res1 = session.query(paged_query.clone(), &[]).await?;
let res2 = session
    .query_paged(paged_query.clone(), &[], res1.paging_state)
    .await?;
Ok(())
}
}

On a PreparedStatement:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;

let paged_prepared = session
    .prepare(Query::new("SELECT a, b, c FROM ks.t").with_page_size(7))
    .await?;
let res1 = session.execute(&paged_prepared, &[]).await?;
let res2 = session
    .execute_paged(&paged_prepared, &[], res1.paging_state)
    .await?;
Ok(())
}
}

Performance

Performance is the same as in non-paged variants.
For the best performance use prepared queries.

Lightweight transaction (LWT) query

A lightweight transaction query can be expressed just like any other query, via Session, with the notable difference of having an additional consistency level parameter - the serial_consistency_level.

Format of the query

A lightweight transaction query is not a separate type - it can be expressed just like any other queries: via SimpleQuery, PreparedStatement, batches, and so on. The difference lays in the query string itself - when it contains a condition (e.g. IF NOT EXISTS), it becomes a lightweight transaction. It's important to remember that CQL specification requires a separate, additional consistency level to be defined for LWT queries - serial_consistency_level. The serial consistency level can only be set to two values: SerialConsistency::Serial or SerialConsistency::LocalSerial. The "local" variant makes the transaction consistent only within the same datacenter. For convenience, Scylla Rust Driver sets the default consistency level to LocalSerial, as it's more commonly used. For cross-datacenter consistency, please remember to always override the default with SerialConsistency::Serial.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::statement::{Consistency, SerialConsistency};

// Create a Query manually to change the Consistency to ONE
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?) IF NOT EXISTS".to_string());
my_query.set_consistency(Consistency::One);
// Use cross-datacenter serial consistency
my_query.set_serial_consistency(Some(SerialConsistency::Serial));

// Insert a value into the table
let to_insert: i32 = 12345;
session.query(my_query, (to_insert,)).await?;
Ok(())
}
}

The rest of the API remains identical for LWT and non-LWT queries.

See Query API documentation for more options

USE keyspace

Using a keyspace allows to omit keyspace name in queries.

For example in cqlsh one could write:

cqlsh> SELECT * FROM my_keyspace.table;

 a     | b     |
-------+-------+
 12345 | 54321 |

(1 rows)
cqlsh> USE my_keyspace;
cqlsh:my_keyspace> SELECT * FROM table;

 a     | b     |
-------+-------+
 12345 | 54321 |

(1 rows)

Tables from other keyspaces can still easily be accessed by using their keyspace names.

cqlsh:my_keyspace> SELECT * FROM other_keyspace.other_table;

In the driver this can be achieved using Session::use_keyspace:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
session
    .query("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[])
    .await?;

session.use_keyspace("my_keyspace", false).await?;

// Now we can omit keyspace name in the query
session
    .query("INSERT INTO tab (a) VALUES ('test2')", &[])
    .await?;
Ok(())
}
}

The first argument is the keyspace name.
The second argument states whether this name is case sensitive.

It is also possible to send raw use keyspace query using Session::query instead of Session::use_keyspace such as:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
session.query("USE my_keyspace", &[]).await?;
Ok(())
}
}

This method has a slightly worse latency than Session::use_keyspace - there are two roundtrips needed instead of one. Therefore, Session::use_keyspace is the preferred method for setting keyspaces.

Multiple use queries at once

Don't run multiple use_keyspace queries at once. This could end up with half of connections using one keyspace and the other half using the other.

Case sensitivity

In CQL a keyspace name can be case insensitive (without ") or case sensitive (with ").
If the second argument to use_keyspace is set to true this keyspace name will be wrapped in ".
It is best to avoid the problem altogether and just not create two keyspaces with the same name but different cases.

Let's see what happens when there are two keyspaces with the same name but different cases: my_keyspace and MY_KEYSPACE:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
// lowercase name without case sensitivity will use my_keyspace
session.use_keyspace("my_keyspace", false).await?;

// lowercase name with case sensitivity will use my_keyspace
session.use_keyspace("my_keyspace", true).await?;

// uppercase name without case sensitivity will use my_keyspace
session.use_keyspace("MY_KEYSPACE", false).await?;

// uppercase name with case sensitivity will use MY_KEYSPACE
session.use_keyspace("MY_KEYSPACE", true).await?;
Ok(())
}
}

Schema agreement

Sometimes after performing queries some nodes have not been updated so we need a mechanism that checks if every node have agreed schema version. There are four methods in Session that assist us. Every method raise QueryError if something goes wrong, but they should never raise any errors, unless there is a DB or connection malfunction.

Checking schema version

Session::fetch_schema_version returns an Uuid of local node's schema version.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
println!("Local schema version is: {}", session.fetch_schema_version().await?);
Ok(())
}
}

Awaiting schema agreement

Session::await_schema_agreement returns a Future that can be awaited on as long as schema is not in an agreement.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
session.await_schema_agreement().await?;
Ok(())
}
}

Awaiting with timeout

We can also set timeout in miliseconds with Session::await_timed_schema_agreement. It takes one argument, an std::time::Duration value that tells how long our driver should await for schema agreement. If the timeout is met the return value is false otherwise it is true.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
use std::time::Duration;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
if session.await_timed_schema_agreement(Duration::from_secs(5)).await? { // wait for 5 seconds
    println!("SCHEMA AGREED");
} else {
    println!("SCHEMA IS NOT IN AGREEMENT - TIMED OUT");
}
Ok(())
}
}

Checking for schema interval

If schema is not agreed driver sleeps for a duration before checking it again. Default value is 200 miliseconds but it can be changed with SessionBuilder::schema_agreement_interval.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::SessionBuilder;
use std::error::Error;
use std::time::Duration;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .schema_agreement_interval(Duration::from_secs(1))
    .build()
    .await?;
Ok(())
}
}

Checking if schema is in agreement now

If you want to check if schema is in agreement now without retrying after failure you can use Session::check_schema_agreement function.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
if session.check_schema_agreement().await? { 
    println!("SCHEMA AGREED");
} else {
    println!("SCHEMA IS NOT IN AGREEMENT");
}
Ok(())
}
}

Data Types

The driver maps database data types to matching Rust types to achieve seamless sending and receiving of CQL values.

See the following chapters for examples on how to send and receive each data type.

See Query values for more information about sending values in queries.
See Query result for more information about reading values from queries

Database types and their Rust equivalents:

  • Boolean <----> bool
  • Tinyint <----> i8
  • Smallint <----> i16
  • Int <----> i32
  • BigInt <----> i64
  • Float <----> f32
  • Double <----> f64
  • Ascii, Text, Varchar <----> &str, String
  • Counter <----> value::Counter
  • Blob <----> Vec<u8>
  • Inet <----> std::net::IpAddr
  • Uuid, Timeuuid <----> uuid::Uuid
  • Date <----> chrono::NaiveDate, u32
  • Time <----> chrono::Duration
  • Timestamp <----> chrono::Duration
  • Decimal <----> bigdecimal::Decimal
  • Varint <----> num_bigint::BigInt
  • List <----> Vec<T>
  • Set <----> Vec<T>
  • Map <----> std::collections::HashMap<K, V>
  • Tuple <----> Rust tuples
  • UDT (User defined type) <----> Custom user structs with macros

Bool, Tinyint, Smallint, Int, Bigint, Float, Double

Bool

Bool is represented as rust bool


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a bool into the table
let to_insert: bool = true;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a bool from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(bool,)>() {
        let (bool_value,): (bool,) = row?;
    }
}
Ok(())
}
}

Tinyint

Tinyint is represented as rust i8


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a tinyint into the table
let to_insert: i8 = 123;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a tinyint from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(i8,)>() {
        let (tinyint_value,): (i8,) = row?;
    }
}
Ok(())
}
}

Smallint

Smallint is represented as rust i16


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a smallint into the table
let to_insert: i16 = 12345;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a smallint from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(i16,)>() {
        let (smallint_value,): (i16,) = row?;
    }
}
Ok(())
}
}

Int

Int is represented as rust i32


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert an int into the table
let to_insert: i32 = 12345;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read an int from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(i32,)>() {
        let (int_value,): (i32,) = row?;
    }
}
Ok(())
}
}

Bigint

Bigint is represented as rust i64


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a bigint into the table
let to_insert: i64 = 12345;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a bigint from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(i64,)>() {
        let (bigint_value,): (i64,) = row?;
    }
}
Ok(())
}
}

Float

Float is represented as rust f32


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a float into the table
let to_insert: f32 = 123.0;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a float from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(f32,)>() {
        let (float_value,): (f32,) = row?;
    }
}
Ok(())
}
}

Double

Double is represented as rust f64


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a double into the table
let to_insert: f64 = 12345.0;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a double from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(f64,)>() {
        let (double_value,): (f64,) = row?;
    }
}
Ok(())
}
}

Ascii, Text, Varchar

Ascii, Text and Varchar are represented as &str and String


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert some text into the table as a &str
let to_insert_str: &str = "abcdef";
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert_str,))
    .await?;

// Insert some text into the table as a String
let to_insert_string: String = "abcdef".to_string();
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert_string,))
    .await?;

// Read ascii/text/varchar from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(String,)>() {
        let (text_value,): (String,) = row?;
    }
}
Ok(())
}
}

Counter

Counter is represented as struct Counter(pub i64)
Counter can't be inserted, it can only be read or updated.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::frame::value::Counter;

// Read counter from the table
if let Some(rows) = session.query("SELECT c FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Counter,)>() {
        let (counter_value,): (Counter,) = row?;
        let counter_int_value: i64 = counter_value.0;
    }
}
Ok(())
}
}

Blob

Blob is represented as Vec<u8>


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert some blob into the table as a Vec<u8>
// We can insert it by reference to not move the whole blob
let to_insert: Vec<u8> = vec![1, 2, 3, 4, 5];
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,))
    .await?;

// Read blobs from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Vec<u8>,)>() {
        let (blob_value,): (Vec<u8>,) = row?;
    }
}
Ok(())
}
}

Inet

Inet is represented as std::net::IpAddr


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use std::net::{IpAddr, Ipv4Addr};

// Insert some ip address into the table
let to_insert: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read inet from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(IpAddr,)>() {
        let (inet_value,): (IpAddr,) = row?;
    }
}
Ok(())
}
}

Uuid, Timeuuid

Uuid and Timeuuid are represented as uuid::Uuid


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use uuid::Uuid;

// Insert some uuid/timeuuid into the table
let to_insert: Uuid = Uuid::parse_str("8e14e760-7fa8-11eb-bc66-000000000001")?;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read uuid/timeuuid from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Uuid,)>() {
        let (uuid_value,): (Uuid,) = row?;
    }
}
Ok(())
}
}

Date

For most use cases Date can be represented as chrono::NaiveDate.
NaiveDate supports dates from -262145-1-1 to 262143-12-31.

For dates outside of this range you can use the raw u32 representation.

Using chrono::NaiveDate:


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate chrono;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use chrono::naive::NaiveDate;

// Insert some date into the table
let to_insert: NaiveDate = NaiveDate::from_ymd(2021, 3, 24);
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read NaiveDate from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(NaiveDate,)>() {
        let (date_value,): (NaiveDate,) = row?;
    }
}
Ok(())
}
}

Using raw u32 representation

Internally Date is represented as number of days since -5877641-06-23 i.e. 2^31 days before unix epoch.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::frame::value::Date;
use scylla::frame::response::result::CqlValue;

// Insert date using raw u32 representation
let to_insert: Date = Date(2_u32.pow(31)); // 1970-01-01 
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read raw Date from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows {
        let date_value: u32 = match row.columns[0] {
            Some(CqlValue::Date(date_value)) => date_value,
            _ => panic!("Should be a date!")
        };
    }
}
Ok(())
}
}

Time

Time is represented as chrono::Duration

Internally Time is represented as number of nanoseconds since midnight. It can't be negative or exceed 86399999999999 (24 hours).

When sending in a query it needs to be wrapped in value::Time to differentiate from Timestamp


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate chrono;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::frame::value::Time;
use chrono::Duration;

// Insert some time into the table
let to_insert: Duration = Duration::seconds(64);
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (Time(to_insert),))
    .await?;

// Read time from the table, no need for a wrapper here
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Duration,)>() {
        let (time_value,): (Duration,) = row?;
    }
}
Ok(())
}
}

Timestamp

Timestamp is represented as chrono::Duration

Internally Timestamp is represented as i64 describing number of milliseconds since unix epoch. Driver converts this to chrono::Duration

When sending in a query it needs to be wrapped in value::Timestamp to differentiate from Time


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate chrono;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::frame::value::Timestamp;
use chrono::Duration;

// Insert some timestamp into the table
let to_insert: Duration = Duration::seconds(64);
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (Timestamp(to_insert),))
    .await?;

// Read timestamp from the table, no need for a wrapper here
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Duration,)>() {
        let (timestamp_value,): (Duration,) = row?;
    }
}
Ok(())
}
}

Decimal

Decimal is represented as bigdecimal::BigDecimal


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate bigdecimal;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use bigdecimal::BigDecimal;
use std::str::FromStr;

// Insert a decimal into the table
let to_insert: BigDecimal = BigDecimal::from_str("12345.0")?;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a decimal from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(BigDecimal,)>() {
        let (decimal_value,): (BigDecimal,) = row?;
    }
}
Ok(())
}
}

Varint

Varint is represented as num_bigint::BigInt


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate num_bigint;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use num_bigint::BigInt;
use std::str::FromStr;

// Insert a varint into the table
let to_insert: BigInt = BigInt::from_str("12345")?;
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a varint from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(BigInt,)>() {
        let (varint_value,): (BigInt,) = row?;
    }
}
Ok(())
}
}

List, Set, Map

List

List is represented as Vec<T>


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a list of ints into the table
let to_insert: Vec<i32> = vec![1, 2, 3, 4, 5];
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,))
    .await?;

// Read a list of ints from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Vec<i32>,)>() {
        let (list_value,): (Vec<i32>,) = row?;
    }
}
Ok(())
}
}

Set

Set is represented as Vec<T>


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a set of ints into the table
let to_insert: Vec<i32> = vec![1, 2, 3, 4, 5];
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,))
    .await?;

// Read a set of ints from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(Vec<i32>,)>() {
        let (set_value,): (Vec<i32>,) = row?;
    }
}
Ok(())
}
}

Map

Map is represented as std::collections::HashMap<K, V>


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use std::collections::HashMap;

// Insert a map of text and int into the table
let mut to_insert: HashMap<String, i32> = HashMap::new();
to_insert.insert("abcd".to_string(), 16);

session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,))
    .await?;

// Read a map from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(HashMap<String, i32>,)>() {
        let (map_value,): (HashMap<String, i32>,) = row?;
    }
}
Ok(())
}
}

Tuple

Tuple is represented as rust tuples of max 16 elements.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;

// Insert a tuple of int and string into the table
let to_insert: (i32, String) = (1, "abc".to_string());
session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read a tuple of int and string from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<((i32, String),)>() {
        let (tuple_value,): ((i32, String),) = row?;

        let int_value: i32 = tuple_value.0;
        let string_value: String = tuple_value.1;
    }
}
Ok(())
}
}

User defined types

Scylla allows users to define their own data types with named fields.
The driver supports this by allowing to create a custom struct with derived functionality.

For example let's say my_type was created using this query:

CREATE TYPE ks.my_type (int_val int, text_val text)

To use this type in the driver create a matching struct and derive IntoUserType and FromUserType:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::IntoTypedRows;
use scylla::macros::{FromUserType, IntoUserType};
use scylla::cql_to_rust::FromCqlVal;

// Define custom struct that matches User Defined Type created earlier
// wrapping field in Option will gracefully handle null field values
#[derive(Debug, IntoUserType, FromUserType)]
struct MyType {
    int_val: i32,
    text_val: Option<String>,
}

// Now it can be sent and received like any other value

// Insert my_type into the table
let to_insert = MyType {
    int_val: 17,
    text_val: Some("Some string".to_string()),
};

session
    .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,))
    .await?;

// Read MyType from the table
if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows {
    for row in rows.into_typed::<(MyType,)>() {
        let (my_type_value,): (MyType,) = row?;
    }
}
Ok(())
}
}

Load balancing

There are multiple load balancing strategies that the driver can use.
Load balancing can be configured for the whole Session during creation.

Basic load balancing strategies:

  • RoundRobinPolicy - uses all known nodes one after another
  • DcAwareRoundRobinPolicy - uses all known nodes from the local datacenter one after another

Each of these basic load balancing strategies can be wrapped in TokenAwarePolicy to enable token awareness.

Note
Only prepared queries use token aware load balancing

All queries are shard aware, there is no way to turn off shard awareness.
If a token is available the query is sent to the correct shard, otherwise to a random one.

So, the available load balancing policies are:

By default the driver uses Token aware Round robin

Round robin

The simplest load balancing policy available.
Takes all nodes in the cluster and uses them one after another.

For example if there are nodes A, B, C in the cluster, this policy will use A, B, C, A, B, ...

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::load_balancing::RoundRobinPolicy;
use std::sync::Arc;

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .load_balancing(Arc::new(RoundRobinPolicy::new()))
    .build()
    .await?;
Ok(())
}
}

DC Aware Round robin

This is a more sophisticated version of Round robin policy. It takes all nodes in the local datacenter and uses them one after another.
If no nodes from the local datacenter are available it will fall back to other nodes.

For example if there are two datacenters:

  • us_east with nodes: A, B, C
  • us_west with nodes: D, E, F

this policy when set to us_east will only use A, B, C, A, B, ...

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::load_balancing::DcAwareRoundRobinPolicy;
use std::sync::Arc;

let local_dc_name: String = "us_east".to_string();

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .load_balancing(Arc::new(DcAwareRoundRobinPolicy::new(local_dc_name)))
    .build()
    .await?;
Ok(())
}
}

Token aware Round robin

This policy will try to calculate a token to find replica nodes in which queried data is stored.
After finding the replicas it performs a round robin on them.

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::load_balancing::{RoundRobinPolicy, TokenAwarePolicy};
use std::sync::Arc;

let robin = Box::new(RoundRobinPolicy::new());
let policy = Arc::new(TokenAwarePolicy::new(robin));

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .load_balancing(policy)
    .build()
    .await?;
Ok(())
}
}

Token aware DC Aware Round robin

This policy will try to calculate a token to find replica nodes in which queried data is stored.
After finding the replicas it chooses the ones from the local datacenter and performs a round robin on them.

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::load_balancing::{DcAwareRoundRobinPolicy, TokenAwarePolicy};
use std::sync::Arc;

let local_dc: String = "us_east".to_string();
let dc_robin = Box::new(DcAwareRoundRobinPolicy::new(local_dc));
let policy = Arc::new(TokenAwarePolicy::new(dc_robin));

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .load_balancing(policy)
    .build()
    .await?;
Ok(())
}
}

Retry policy configuration

After a query fails the driver might decide to retry it based on its Retry Policy and the query itself. Retry policy can be configured for Session or just for a single query.

Retry policies

By default there are two retry policies:

It's possible to implement a custom Retry Policy by implementing the traits RetryPolicy and RetrySession.

Query idempotence

A query is idempotent if it can be applied multiple times without changing the result of the initial application

Specifying that a query is idempotent increases the chances that it will be retried in case of failure. Idempotent queries can be retried in situations where retrying non idempotent queries would be dangerous.

Idempotence has to be specified manually, the driver is not able to figure it out by itself.


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::prepared_statement::PreparedStatement;

// Specify that a Query is idempotent
let mut my_query: Query = Query::new("SELECT a FROM ks.tab");
my_query.set_is_idempotent(true);


// Specify that a PreparedStatement is idempotent
let mut prepared: PreparedStatement = session
    .prepare("SELECT a FROM ks.tab")
    .await?;

prepared.set_is_idempotent(true);
Ok(())
}
}

Fallthrough retry policy

The FalthroughRetryPolicy never retries, returns errors straight to the user. Useful for debugging.

Examples

To use in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::retry_policy::FallthroughRetryPolicy;

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .retry_policy(Box::new(FallthroughRetryPolicy::new()))
    .build()
    .await?;
Ok(())
}
}

To use in a simple query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::transport::retry_policy::FallthroughRetryPolicy;

// Create a Query manually and set the retry policy
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
my_query.set_retry_policy(Box::new(FallthroughRetryPolicy::new()));

// Run the query using this retry policy
let to_insert: i32 = 12345;
session.query(my_query, (to_insert,)).await?;
Ok(())
}
}

To use in a prepared query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::retry_policy::FallthroughRetryPolicy;

// Create PreparedStatement manually and set the retry policy
let mut prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.tab (a) VALUES(?)")
    .await?;

prepared.set_retry_policy(Box::new(FallthroughRetryPolicy::new()));

// Run the query using this retry policy
let to_insert: i32 = 12345;
session.execute(&prepared, (to_insert,)).await?;
Ok(())
}
}

Default retry policy

This is the retry policy used by default. It retries when there is a high chance that it might help.
This policy is based on the one in DataStax Java Driver. The behaviour is the same.

Examples

To use in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::retry_policy::DefaultRetryPolicy;

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .retry_policy(Box::new(DefaultRetryPolicy::new()))
    .build()
    .await?;
Ok(())
}
}

To use in a simple query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::transport::retry_policy::DefaultRetryPolicy;

// Create a Query manually and set the retry policy
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
my_query.set_retry_policy(Box::new(DefaultRetryPolicy::new()));

// Run the query using this retry policy
let to_insert: i32 = 12345;
session.query(my_query, (to_insert,)).await?;
Ok(())
}
}

To use in a prepared query:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::retry_policy::DefaultRetryPolicy;

// Create PreparedStatement manually and set the retry policy
let mut prepared: PreparedStatement = session
    .prepare("INSERT INTO ks.tab (a) VALUES(?)")
    .await?;

prepared.set_retry_policy(Box::new(DefaultRetryPolicy::new()));

// Run the query using this retry policy
let to_insert: i32 = 12345;
session.execute(&prepared, (to_insert,)).await?;
Ok(())
}
}

Speculative execution

Speculative query execution is an optimization technique where a driver pre-emptively starts a second execution of a query against another node, before the first node has replied.

There are multiple speculative execution strategies that the driver can use. Speculative execution can be configured for the whole whole Session during its creation.

Available speculative execution strategies:

Speculative execution is not enabled by default, and currently only non-iter session methods use it.

Simple speculative execution

The easiest speculative execution policy available. It starts another execution of a query after constant delay of retry_interval and does at most max_retry_count speculative query executions (not counting the first, non-speculative one).

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use std::{sync::Arc, time::Duration};
use scylla::{
    Session,
    SessionBuilder,
    speculative_execution::SimpleSpeculativeExecutionPolicy
};

let policy = SimpleSpeculativeExecutionPolicy {
    max_retry_count: 3,
    retry_interval: Duration::from_millis(100),
};

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .speculative_execution(Arc::new(policy))
    .build()
    .await?;
Ok(())
}
}

Percentile speculative execution

This policy has access to Metrics shared with session, and triggers speculative execution when the request to the current host is above a given percentile.

Example

To use this policy in Session:


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use std::{sync::Arc, time::Duration};
use scylla::{
    Session,
    SessionBuilder,
    speculative_execution::PercentileSpeculativeExecutionPolicy,
};

let policy = PercentileSpeculativeExecutionPolicy  {
    max_retry_count: 3,
    percentile: 99.0,
};

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042")
    .speculative_execution(Arc::new(policy))
    .build()
    .await?;
Ok(())
}
}

Driver metrics

During operation the driver collects various metrics.

They can be accessed at any moment using Session::get_metrics()

Collected metrics:

  • Query latencies
  • Total number of nonpaged queries
  • Number of errors during nonpaged queries
  • Total number of paged queries
  • Number of errors during paged queries
  • Number of retries

Example


#![allow(unused)]
fn main() {
extern crate scylla;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
let metrics = session.get_metrics();

println!("Queries requested: {}", metrics.get_queries_num());
println!("Iter queries requested: {}", metrics.get_queries_iter_num());
println!("Errors occured: {}", metrics.get_errors_num());
println!("Iter errors occured: {}", metrics.get_errors_iter_num());
println!("Average latency: {}", metrics.get_latency_avg_ms().unwrap());
println!(
    "99.9 latency percentile: {}",
    metrics.get_latency_percentile_ms(99.9).unwrap()
);
Ok(())
}
}

Logging

The driver uses the tracing crate for all logs.
To view the logs you have to create a tracing subscriber to which all logs will be written.

To just print the logs you can use the default subscriber:

extern crate scylla;
extern crate tokio;
extern crate tracing;
extern crate tracing_subscriber;
use std::error::Error;
use scylla::{Session, SessionBuilder};
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Install global collector configured based on RUST_LOG env var
    // This collector will receive logs from the driver
    tracing_subscriber::fmt::init();

    let uri = std::env::var("SCYLLA_URI")
        .unwrap_or_else(|_| "127.0.0.1:9042".to_string());

    info!("Connecting to {}", uri);

    let session: Session = SessionBuilder::new().known_node(uri).build().await?;
    session
        .query(
            "CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = \
            {'class' : 'SimpleStrategy', 'replication_factor' : 1}",
            &[],
        )
        .await?;

    // This query should generate a warning message
    session.query("USE ks", &[]).await?;

    Ok(())
}

To start this example execute:

RUST_LOG=info cargo run

The full example is available in the examples folder

Query tracing

Each query's execution can be traced to see to which nodes it was sent, what operations were performed etc.

First tracing has to be enabled for a query and then its result will contain a tracing id which can be used to query tracing information.

Queries that support tracing:

After obtaining the tracing id you can use Session::get_tracing_info() to query tracing information.
TracingInfo contains values that are the same in Scylla and Cassandra®.
If TracingInfo does not contain some needed value it's possible to query it manually from the tables system_traces.sessions and system_traces.events

Tracing a simple/prepared query

Both simple query and prepared query return a QueryResult which contains a tracing_id if tracing was enabled.

Tracing a simple query


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::QueryResult;
use scylla::tracing::TracingInfo;
use uuid::Uuid;

// Create a Query manually and enable tracing
let mut query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(4)");
query.set_tracing(true);

let res: QueryResult = session.query(query, &[]).await?;
let tracing_id: Option<Uuid> = res.tracing_id;

if let Some(id) = tracing_id {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(&id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}

Tracing a prepared query


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::QueryResult;
use scylla::tracing::TracingInfo;
use uuid::Uuid;

// Prepare the query
let mut prepared: PreparedStatement = session
    .prepare("SELECT a FROM ks.tab")
    .await?;

// Enable tracing for the prepared query
prepared.set_tracing(true);

let res: QueryResult = session.execute(&prepared, &[]).await?;
let tracing_id: Option<Uuid> = res.tracing_id;

if let Some(id) = tracing_id {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(&id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}

Tracing a batch query

Session::batch returns a BatchResult which contains a tracing_id if tracing was enabled.


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::batch::Batch;
use scylla::BatchResult;
use scylla::tracing::TracingInfo;
use uuid::Uuid;

// Create a batch statement
let mut batch: Batch = Default::default();
batch.append_statement("INSERT INTO ks.tab (a) VALUES(4)");

// Enable tracing
batch.set_tracing(true);

let res: BatchResult = session.batch(&batch, ((),)).await?;
let tracing_id: Option<Uuid> = res.tracing_id;

if let Some(id) = tracing_id {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(&id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}

Tracing a paged query

A paged query performs multiple simple/prepared queries to query subsequent pages.
If tracing is enabled the row iterator will contain a list of tracing ids for all performed queries.

Tracing Session::query_iter


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
extern crate futures;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::transport::iterator::RowIterator;
use scylla::tracing::TracingInfo;
use futures::StreamExt;
use uuid::Uuid;

// Create a Query manually and enable tracing
let mut query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(4)");
query.set_tracing(true);

// Create a paged query iterator and fetch pages
let mut row_iterator: RowIterator = session.query_iter(query, &[]).await?;
while let Some(_row) = row_iterator.next().await {
    // Receive rows
}

// Now there are tracing ids for each performed query
let tracing_ids: &[Uuid] = row_iterator.get_tracing_ids();

for id in tracing_ids {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}

Tracing Session::execute_iter


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
extern crate futures;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::iterator::RowIterator;
use scylla::tracing::TracingInfo;
use futures::StreamExt;
use uuid::Uuid;

// Prepare the query
let mut prepared: PreparedStatement = session
    .prepare("SELECT a FROM ks.tab")
    .await?;

// Enable tracing for the prepared query
prepared.set_tracing(true);

// Create a paged query iterator and fetch pages
let mut row_iterator: RowIterator = session.execute_iter(prepared, &[]).await?;
while let Some(_row) = row_iterator.next().await {
    // Receive rows
}

// Now there are tracing ids for each performed query
let tracing_ids: &[Uuid] = row_iterator.get_tracing_ids();

for id in tracing_ids {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}

Tracing Session::prepare

Session::prepare prepares a query on all connections. If tracing is enabled for the Query to prepare, the resulting PreparedStatement will contain prepare_tracing_ids. prepare_tracing_ids is a list of tracing ids of prepare requests on all connections.


#![allow(unused)]
fn main() {
extern crate scylla;
extern crate uuid;
use scylla::Session;
use std::error::Error;
async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::prepared_statement::PreparedStatement;
use scylla::tracing::TracingInfo;
use uuid::Uuid;

// Prepare the query with tracing enabled
let mut to_prepare: Query = Query::new("SELECT a FROM ks.tab");
to_prepare.set_tracing(true);

let mut prepared: PreparedStatement = session
    .prepare(to_prepare)
    .await?;

// Now there are tracing ids for each prepare request
let tracing_ids: &[Uuid] = &prepared.prepare_tracing_ids;

for id in tracing_ids {
    // Query tracing info from system_traces.sessions and system_traces.events
    let tracing_info: TracingInfo = session.get_tracing_info(id).await?;
    println!("tracing_info: {:#?}", tracing_info);
}
Ok(())
}
}