Scylla Rust Driver
This book contains documentation for scylla-rust-driver - a driver for the Scylla database written in Rust. Although optimized for Scylla, the driver is also compatible with Apache Cassandra®.
Other documentation
Contents
- Quick start - Setting up a Rust project using
scylla-rust-driver
and running a few queries - Connecting to the cluster - Configuring a connection to scylla cluster
- Making queries - Making different types of queries (simple, prepared, batch, paged)
- Data Types - How to use various column data types
- Load balancing - Load balancing configuration, local datacenters etc.
- Retry policy configuration - What to do when a query fails, query idempotence
- Driver metrics - Statistics about the driver - number of queries, latency etc.
- Logging - Viewing and integrating logs produced by the driver
- Query tracing - Tracing query execution
Quick Start
In this chapter we will set up a Rust project and run a few simple queries.
Topics Include:
Creating a project
To create a new project run:
cargo new myproject
In Cargo.toml
add useful dependencies:
[dependencies]
scylla = "0.3.1"
tokio = { version = "1.1.0", features = ["full"] }
futures = "0.3.6"
uuid = "0.8.1"
bigdecimal = "0.2.0"
num-bigint = "0.3"
tracing = "0.1.25"
tracing-subscriber = "0.2.16"
In main.rs
put:
extern crate scylla; extern crate tokio; use scylla::Session; #[tokio::main] async fn main() { println!("Hello scylla!"); }
Now running cargo run
should print:
Hello scylla!
Running Scylla using Docker
To make queries we will need a running Scylla instance. The easiest way is to use a Docker image.
Please install Docker if it's not installed.
Running scylla
To start Scylla run:
# on Linux sudo might be required
docker run --rm -it -p 9042:9042 scylladb/scylla --smp 2
Docker will download the image, then after minute or two there should be a message like:
Starting listening for CQL clients on 172.17.0.2:9042
This means that Scylla is ready to receive queries
To stop this instance press Ctrl + C
More information
More information about this image can be found on dockerhub
Connecting and running a simple query
Now everything is ready to use the driver. Here is a small example:
extern crate scylla; extern crate tokio; use scylla::{IntoTypedRows, Session, SessionBuilder}; use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { // Create a new Session which connects to node at 127.0.0.1:9042 // (or SCYLLA_URI if specified) let uri = std::env::var("SCYLLA_URI") .unwrap_or_else(|_| "127.0.0.1:9042".to_string()); let session: Session = SessionBuilder::new() .known_node(uri) .build() .await?; // Create an example keyspace and table session .query( "CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = \ {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[], ) .await?; session .query( "CREATE TABLE IF NOT EXISTS ks.extab (a int primary key)", &[], ) .await?; // Insert a value into the table let to_insert: i32 = 12345; session .query("INSERT INTO ks.extab (a) VALUES(?)", (to_insert,)) .await?; // Query rows from the table and print them if let Some(rows) = session.query("SELECT a FROM ks.extab", &[]).await?.rows { // Parse each row as a tuple containing single i32 for row in rows.into_typed::<(i32,)>() { let read_row: (i32,) = row?; println!("Read a value from row: {}", read_row.0); } } Ok(()) }
Connecting to the cluster
Scylla is a distributed database, which means that it operates on multiple nodes running independently.
When creating a Session
you can specify a few known nodes to which the driver will try connecting:
extern crate scylla; extern crate tokio; use scylla::{Session, SessionBuilder}; use std::error::Error; use std::time::Duration; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let uri = std::env::var("SCYLLA_URI") .unwrap_or_else(|_| "127.0.0.1:9042".to_string()); let session: Session = SessionBuilder::new() .known_node(uri) .known_node("127.0.0.72:4321") .known_node("localhost:8000") .connection_timeout(Duration::from_secs(3)) .known_node_addr(SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000, )) .build() .await?; Ok(()) }
After succesfully connecting to some specified node the driver will fetch topology information about other nodes in this cluster and connect to them as well.
Compression
By default the driver does not use any compression on connections.
It's possible to specify a preferred compression algorithm.
The driver will try using it, but if the database doesn't support it, it will fall back to no compression.
Available compression algorithms:
- Snappy
- LZ4
An example enabling Snappy
compression algorithm:
extern crate scylla; extern crate tokio; use scylla::{Session, SessionBuilder}; use scylla::transport::Compression; use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let uri = std::env::var("SCYLLA_URI") .unwrap_or_else(|_| "127.0.0.1:9042".to_string()); let session: Session = SessionBuilder::new() .known_node(uri) .compression(Some(Compression::Snappy)) .build() .await?; Ok(()) }
Authentication
Driver supports authentication by username and password.
To specify credentials use the user
method in SessionBuilder
:
#![allow(unused)] fn main() { extern crate scylla; extern crate tokio; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .user("myusername", "mypassword") .build() .await?; Ok(()) } }
TLS
Driver uses the openssl
crate for TLS functionality.
It was chosen because rustls
doesn't support certificates for ip addresses
(see issue), which is a common use case for Scylla.
Enabling feature
openssl
is not a pure Rust library so you need enable a feature and install the proper package.
To enable the tls
feature add in Cargo.toml
:
scylla = { version = "0.3.1", features = ["ssl"] }
openssl = "0.10.32"
Then install the package with openssl
:
- Debian/Ubuntu:
apt install libssl-dev pkg-config
- Fedora:
dnf install openssl-devel
- Arch:
pacman -S openssl pkg-config
Using TLS
To use tls you will have to create an openssl
SslContext
and pass it to SessionBuilder
For example, if database certificate is in the file ca.crt
:
#![allow(unused)] fn main() { extern crate scylla; extern crate openssl; use scylla::{Session, SessionBuilder}; use openssl::ssl::{SslContextBuilder, SslMethod, SslVerifyMode}; use std::path::PathBuf; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { let mut context_builder = SslContextBuilder::new(SslMethod::tls())?; context_builder.set_ca_file("ca.crt")?; context_builder.set_verify(SslVerifyMode::PEER); let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9142") // The the port is now 9142 .ssl_context(Some(context_builder.build())) .build() .await?; Ok(()) } }
See the full example for more details
Making queries
This driver supports all query types available in Scylla:
- Simple queries
- Easy to use
- Poor performance
- Primitve load balancing
- Prepared queries
- Need to be prepared before use
- Fast
- Properly load balanced
- Batch statements
- Run multiple queries at once
- Can be prepared for better performance and load balancing
- Paged queries
- Allows to read result in multiple pages when it doesn't fit in a single response
- Can be prepared for better performance and load balancing
Additionaly there is special functionality to enable USE KEYSPACE
queries:
USE keyspace
Queries are fully asynchronous - you can run as many of them in parallel as you wish.
Simple query
Simple query takes query text and values and simply executes them on a Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn simple_query_example(session: &Session) -> Result<(), Box<dyn Error>> { // Insert a value into the table let to_insert: i32 = 12345; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; Ok(()) } }
Warning
Don't use simple query to receive large amounts of data.
By default the query is unpaged and might cause heavy load on the cluster.
In such cases set a page size and use paged query instead.When page size is set,
query
will return only the first page of results.
First argument - the query
As the first argument Session::query
takes anything implementing Into<Query>
.
You can create a query manually to set custom options. For example to change query consistency:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::statement::Consistency; // Create a Query manually to change the Consistency to ONE let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)"); my_query.set_consistency(Consistency::One); // Insert a value into the table let to_insert: i32 = 12345; session.query(my_query, (to_insert,)).await?; Ok(()) } }
See Query API documentation for more options
Second argument - the values
Query text is constant, but the values might change.
You can pass changing values to a query by specifying a list of variables as bound values.
Each ?
in query text will be filled with the matching value.
The easiest way is to pass values using a tuple:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { // Sending an integer and a string using a tuple session .query("INSERT INTO ks.tab (a, b, c) VALUES(?, ?, 'text2')", (2_i32, "Some text")) .await?; Ok(()) } }
Here the first ?
will be filled with 2
and the second with "Some text"
.
Never pass values by adding strings, this could lead to SQL Injection
See Query values for more information about sending values in queries
Query result
Session::query
returns QueryResult
with rows represented as Option<Vec<Row>>
.
Each row can be parsed as a tuple of rust types using into_typed
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Query rows from the table and print them if let Some(rows) = session.query("SELECT a FROM ks.tab", &[]).await?.rows { // Parse each row as a tuple containing single i32 for row in rows.into_typed::<(i32,)>() { let read_row: (i32,) = row?; println!("Read a value from row: {}", read_row.0); } } Ok(()) } }
In cases where page size is set, simple query returns only a single page of results.
To receive all pages use a paged query instead.
See Query result for more information about handling query results
Performance
Simple queries should not be used in places where performance matters.
If perfomance matters use a Prepared query instead.
With simple query the database has to parse query text each time it's executed, which worsens performance.
Additionaly token and shard aware load balancing does not work with simple queries. They are sent to random nodes.
Query values
Query text is constant, but the values might change.
You can pass changing values to a query by specifying a list of variables as bound values.
Each ?
in query text will be filled with the matching value.
Never pass values by adding strings, this could lead to SQL Injection
Each list of values to send in a query must implement the trait ValueList
.
By default this can be a slice &[]
, a tuple ()
(max 16 elements) of values to send,
or a custom struct which derives from ValueList
.
A few examples:
#![allow(unused)] fn main() { extern crate scylla; use scylla::{Session, ValueList}; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { // Empty slice means that there are no values to send session.query("INSERT INTO ks.tab (a) VALUES(1)", &[]).await?; // Empty tuple/unit also means that there are no values to send session.query("INSERT INTO ks.tab (a) VALUES(1)", ()).await?; // Sending three integers using a slice: session .query("INSERT INTO ks.tab (a, b, c) VALUES(?, ?, ?)", [1_i32, 2, 3].as_ref()) .await?; // Sending an integer and a string using a tuple session .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", (2_i32, "Some text")) .await?; // Sending an integer and a string using a named struct. // The values will be passed in the order from the struct definition #[derive(ValueList)] struct IntString { first_col: i32, second_col: String, } let int_string = IntString { first_col: 42_i32, second_col: "hello".to_owned(), }; session .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", int_string) .await?; // Sending a single value as a tuple requires a trailing coma (Rust syntax): session.query("INSERT INTO ks.tab (a) VALUES(?)", (2_i32,)).await?; // Each value can also be sent using a reference: session .query("INSERT INTO ks.tab (a, b) VALUES(?, ?)", &(&2_i32, &"Some text")) .await?; Ok(()) } }
NULL
values
Null values can be sent using Option<>
- sending a None
will make the value NULL
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { let null_i32: Option<i32> = None; session .query("INSERT INTO ks.tab (a) VALUES(?)", (null_i32,)) .await?; Ok(()) } }
Unset
values
When performing an insert with values which might be NULL
, it's better to use Unset
.
Database treats inserting NULL
as a delete operation and will generate a tombstone.
Using Unset
results in better performance:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::frame::value::{MaybeUnset, Unset}; // Inserting a null results in suboptimal performance let null_i32: Option<i32> = None; session .query("INSERT INTO ks.tab (a) VALUES(?)", (null_i32,)) .await?; // Using MaybeUnset enum is better let unset_i32: MaybeUnset<i32> = MaybeUnset::Unset; session .query("INSERT INTO ks.tab (a) VALUES(?)", (unset_i32,)) .await?; // If we are sure that a value should be unset we can simply use Unset session .query("INSERT INTO ks.tab (a) VALUES(?)", (Unset,)) .await?; Ok(()) } }
See the issue for more information about Unset
Other data types
See Data Types for instructions on sending other data types
Query result
Session::query
and Session::execute
return a QueryResult
with rows represented as Option<Vec<Row>>
.
Basic representation
Row
is a basic representation of a received row. It can be used by itself, but it's a bit awkward to use:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows { for row in rows { let int_value: i32 = row.columns[0].as_ref().unwrap().as_int().unwrap(); } } Ok(()) } }
Parsing using into_typed
The driver provides a way to parse a row as a tuple of Rust types:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Parse row as a single column containing an int value if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows { for row in rows { let (int_value,): (i32,) = row.into_typed::<(i32,)>()?; } } // rows.into_typed() converts a Vec of Rows to an iterator of parsing results if let Some(rows) = session.query("SELECT a from ks.tab", &[]).await?.rows { for row in rows.into_typed::<(i32,)>() { let (int_value,): (i32,) = row?; } } // Parse row as two columns containing an int and text columns if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows { for row in rows.into_typed::<(i32, String)>() { let (int_value, text_value): (i32, String) = row?; } } Ok(()) } }
NULL
values
NULL
values will return an error when parsed as a Rust type.
To properly handle NULL
values parse column as an Option<>
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Parse row as two columns containing an int and text which might be null if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows { for row in rows.into_typed::<(i32, Option<String>)>() { let (int_value, str_or_null): (i32, Option<String>) = row?; } } Ok(()) } }
Parsing row as a custom struct
It is possible to receive row as a struct with fields matching the columns.
The struct must:
- have the same number of fields as the number of queried columns
- have field types matching the columns being received
- derive
FromRow
Field names don't need to match column names.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::macros::FromRow; use scylla::frame::response::cql_to_rust::FromRow; #[derive(FromRow)] struct MyRow { age: i32, name: Option<String> } // Parse row as two columns containing an int and text which might be null if let Some(rows) = session.query("SELECT a, b from ks.tab", &[]).await?.rows { for row in rows.into_typed::<MyRow>() { let my_row: MyRow = row?; } } Ok(()) } }
Other data types
For parsing other data types see Data Types
Prepared query
Prepared queries provide much better performance than simple queries, but they need to be prepared before use.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; // Prepare the query for later execution let prepared: PreparedStatement = session .prepare("INSERT INTO ks.tab (a) VALUES(?)") .await?; // Run the prepared query with some values, just like a simple query let to_insert: i32 = 12345; session.execute(&prepared, (to_insert,)).await?; Ok(()) } }
Warning
For token/shard aware load balancing to work properly, all partition key values must be sent as bound values (see performance section)
Warning
Don't useexecute
to receive large amounts of data.
By default the query is unpaged and might cause heavy load on the cluster. In such cases set a page size and use a paged query instead.When page size is set,
execute
will return only the first page of results.
Session::prepare
Session::prepare
takes query text and prepares the query on all nodes and shards.
If at least one succeds returns success.
Session::execute
Session::execute
takes a prepared query and bound values and runs the query.
Passing values and the result is the same as in simple query.
Query options
To specify custom options, set them on the PreparedStatement
before execution.
For example to change the consistency:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; use scylla::statement::Consistency; // Prepare the query for later execution let mut prepared: PreparedStatement = session .prepare("INSERT INTO ks.tab (a) VALUES(?)") .await?; // Set prepared query consistency to One // This is the consistency with which this query will be executed prepared.set_consistency(Consistency::One); // Run the prepared query with some values, just like a simple query let to_insert: i32 = 12345; session.execute(&prepared, (to_insert,)).await?; Ok(()) } }
See PreparedStatement API documentation for more options.
Note Prepared statements can be created from
Query
structs and will inherit from the custom options that theQuery
was created with. This is especially useful when usingCachingSession::execute
for example.
Performance
Prepared queries have good performance, much better than simple queries. By default they use shard/token aware load balancing.
Always pass partition key values as bound values. Otherwise the driver can't hash them to compute partition key and they will be sent to the wrong node, which worsens performance.
Let's say we have a table like this:
TABLE ks.prepare_table (
a int,
b int,
c int,
PRIMARY KEY (a, b)
)
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; // WRONG - partition key value is passed in query string // Load balancing will compute the wrong partition key let wrong_prepared: PreparedStatement = session .prepare("INSERT INTO ks.prepare_table (a, b, c) VALUES(12345, ?, 16)") .await?; session.execute(&wrong_prepared, (54321,)).await?; // GOOD - partition key values are sent as bound values // Other values can be sent any way you like, it doesn't matter let good_prepared: PreparedStatement = session .prepare("INSERT INTO ks.prepare_table (a, b, c) VALUES(?, ?, 16)") .await?; session.execute(&good_prepared, (12345, 54321)).await?; Ok(()) } }
Batch statement
A batch statement allows to run many queries at once.
These queries can be simple queries or prepared queries.
Only queries like INSERT
or UPDATE
can be in a batch, batch doesn't return any rows.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::batch::Batch; use scylla::query::Query; use scylla::prepared_statement::PreparedStatement; // Create a batch statement let mut batch: Batch = Default::default(); // Add a simple query to the batch using its text batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)"); // Add a simple query created manually to the batch let simple: Query = Query::new("INSERT INTO ks.tab (a, b) VALUES(3, 4)"); batch.append_statement(simple); // Add a prepared query to the batch let prepared: PreparedStatement = session .prepare("INSERT INTO ks.tab (a, b) VALUES(?, 6)") .await?; batch.append_statement(prepared); // Specify bound values to use with each query let batch_values = ((1_i32, 2_i32), (), (5_i32,)); // Run the batch, doesn't return any rows session.batch(&batch, batch_values).await?; Ok(()) } }
Batch options
You can set various options by operating on the Batch
object.
For example to change consistency:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::batch::Batch; use scylla::statement::Consistency; // Create a batch let mut batch: Batch = Default::default(); batch.append_statement("INSERT INTO ks.tab(a) VALUES(16)"); // Set batch consistency to One batch.set_consistency(Consistency::One); // Run the batch session.batch(&batch, ((), )).await?; Ok(()) } }
See Batch API documentation for more options
Batch values
Batch takes a tuple of values specified just like in simple or prepared queries.
Length of batch values must be equal to the number of statements in a batch.
Each query must have its values specified, even if they are empty.
Values passed to Session::batch
must implement the trait BatchValues
.
By default this includes tuples ()
and slices &[]
of tuples and slices which implement ValueList
.
Example:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::batch::Batch; let mut batch: Batch = Default::default(); // A query with two bound values batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)"); // A query with one bound value batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)"); // A query with no bound values batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)"); // Batch values is a tuple of 3 tuples containing values for each query let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first query (4_i32,), // Tuple with one value for the second query ()); // Empty tuple/unit for the third query // Run the batch session.batch(&batch, batch_values).await?; Ok(()) } }
For more information about sending values in a query see Query values
Performance
Batch statements do not use token/shard aware load balancing, batches are sent to a random node.
Use prepared queries for best performance
Paged query
Sometimes query results might not fit in a single page. Paged queries allow to receive the whole result page by page.
Session::query_iter
and Session::execute_iter
take a simple query or a prepared query
and return an async
iterator over result Rows
.
Examples
Use query_iter
to perform a simple query with paging:
#![allow(unused)] fn main() { extern crate scylla; extern crate futures; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use futures::stream::StreamExt; let mut rows_stream = session .query_iter("SELECT a, b FROM ks.t", &[]) .await? .into_typed::<(i32, i32)>(); while let Some(next_row_res) = rows_stream.next().await { let (a, b): (i32, i32) = next_row_res?; println!("a, b: {}, {}", a, b); } Ok(()) } }
Use execute_iter
to perform a prepared query with paging:
#![allow(unused)] fn main() { extern crate scylla; extern crate futures; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::prepared_statement::PreparedStatement; use futures::stream::StreamExt; let prepared: PreparedStatement = session .prepare("SELECT a, b FROM ks.t") .await?; let mut rows_stream = session .execute_iter(prepared, &[]) .await? .into_typed::<(i32, i32)>(); while let Some(next_row_res) = rows_stream.next().await { let (a, b): (i32, i32) = next_row_res?; println!("a, b: {}, {}", a, b); } Ok(()) } }
Query values can be passed to query_iter
and execute_iter
just like in a simple query
Configuring page size
It's possible to configure the size of a single page.
On a Query
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; let mut query: Query = Query::new("SELECT a, b FROM ks.t"); query.set_page_size(16); let _ = session.query_iter(query, &[]).await?; // ... Ok(()) } }
On a PreparedStatement
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; let mut prepared: PreparedStatement = session .prepare("SELECT a, b FROM ks.t") .await?; prepared.set_page_size(16); let _ = session.execute_iter(prepared, &[]).await?; // ... Ok(()) } }
Passing the paging state manually
It's possible to fetch a single page from the table, extract the paging state from the result and manually pass it to the next query. That way, the next query will start fetching the results from where the previous one left off.
On a Query
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; let paged_query = Query::new("SELECT a, b, c FROM ks.t").with_page_size(6); let res1 = session.query(paged_query.clone(), &[]).await?; let res2 = session .query_paged(paged_query.clone(), &[], res1.paging_state) .await?; Ok(()) } }
On a PreparedStatement
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; let paged_prepared = session .prepare(Query::new("SELECT a, b, c FROM ks.t").with_page_size(7)) .await?; let res1 = session.execute(&paged_prepared, &[]).await?; let res2 = session .execute_paged(&paged_prepared, &[], res1.paging_state) .await?; Ok(()) } }
Performance
Performance is the same as in non-paged variants.
For the best performance use prepared queries.
Lightweight transaction (LWT) query
A lightweight transaction query can be expressed just like any other query, via Session
, with the notable difference of having an additional consistency level parameter - the serial_consistency_level
.
Format of the query
A lightweight transaction query is not a separate type - it can be expressed just like any other queries: via SimpleQuery
, PreparedStatement
, batches, and so on. The difference lays in the query string itself - when it contains a condition (e.g. IF NOT EXISTS
), it becomes a lightweight transaction. It's important to remember that CQL specification requires a separate, additional consistency level to be defined for LWT queries - serial_consistency_level
. The serial consistency level can only be set to two values: SerialConsistency::Serial
or SerialConsistency::LocalSerial
. The "local" variant makes the transaction consistent only within the same datacenter. For convenience, Scylla Rust Driver sets the default consistency level to LocalSerial
, as it's more commonly used. For cross-datacenter consistency, please remember to always override the default with SerialConsistency::Serial
.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::statement::{Consistency, SerialConsistency}; // Create a Query manually to change the Consistency to ONE let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?) IF NOT EXISTS".to_string()); my_query.set_consistency(Consistency::One); // Use cross-datacenter serial consistency my_query.set_serial_consistency(Some(SerialConsistency::Serial)); // Insert a value into the table let to_insert: i32 = 12345; session.query(my_query, (to_insert,)).await?; Ok(()) } }
The rest of the API remains identical for LWT and non-LWT queries.
See Query API documentation for more options
USE keyspace
Using a keyspace allows to omit keyspace name in queries.
For example in cqlsh
one could write:
cqlsh> SELECT * FROM my_keyspace.table;
a | b |
-------+-------+
12345 | 54321 |
(1 rows)
cqlsh> USE my_keyspace;
cqlsh:my_keyspace> SELECT * FROM table;
a | b |
-------+-------+
12345 | 54321 |
(1 rows)
Tables from other keyspaces can still easily be accessed by using their keyspace names.
cqlsh:my_keyspace> SELECT * FROM other_keyspace.other_table;
In the driver this can be achieved using Session::use_keyspace
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { session .query("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[]) .await?; session.use_keyspace("my_keyspace", false).await?; // Now we can omit keyspace name in the query session .query("INSERT INTO tab (a) VALUES ('test2')", &[]) .await?; Ok(()) } }
The first argument is the keyspace name.
The second argument states whether this name is case sensitive.
It is also possible to send raw use keyspace query using Session::query
instead of Session::use_keyspace
such as:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { session.query("USE my_keyspace", &[]).await?; Ok(()) } }
This method has a slightly worse latency than Session::use_keyspace
- there are two roundtrips needed instead of one.
Therefore, Session::use_keyspace
is the preferred method for setting keyspaces.
Multiple use queries at once
Don't run multiple use_keyspace
queries at once.
This could end up with half of connections using one keyspace and the other half using the other.
Case sensitivity
In CQL a keyspace name can be case insensitive (without "
) or case sensitive (with "
).
If the second argument to use_keyspace
is set to true
this keyspace name will be wrapped in "
.
It is best to avoid the problem altogether and just not create two keyspaces with the same name but different cases.
Let's see what happens when there are two keyspaces with the same name but different cases: my_keyspace
and MY_KEYSPACE
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { // lowercase name without case sensitivity will use my_keyspace session.use_keyspace("my_keyspace", false).await?; // lowercase name with case sensitivity will use my_keyspace session.use_keyspace("my_keyspace", true).await?; // uppercase name without case sensitivity will use my_keyspace session.use_keyspace("MY_KEYSPACE", false).await?; // uppercase name with case sensitivity will use MY_KEYSPACE session.use_keyspace("MY_KEYSPACE", true).await?; Ok(()) } }
Schema agreement
Sometimes after performing queries some nodes have not been updated so we need a mechanism that checks if every node have agreed schema version.
There are four methods in Session
that assist us.
Every method raise QueryError
if something goes wrong, but they should never raise any errors, unless there is a DB or connection malfunction.
Checking schema version
Session::fetch_schema_version
returns an Uuid
of local node's schema version.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { println!("Local schema version is: {}", session.fetch_schema_version().await?); Ok(()) } }
Awaiting schema agreement
Session::await_schema_agreement
returns a Future
that can be await
ed on as long as schema is not in an agreement.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { session.await_schema_agreement().await?; Ok(()) } }
Awaiting with timeout
We can also set timeout in miliseconds with Session::await_timed_schema_agreement
.
It takes one argument, an std::time::Duration
value that tells how long our driver should await for schema agreement. If the timeout is met the return value is false
otherwise it is true
.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; use std::time::Duration; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { if session.await_timed_schema_agreement(Duration::from_secs(5)).await? { // wait for 5 seconds println!("SCHEMA AGREED"); } else { println!("SCHEMA IS NOT IN AGREEMENT - TIMED OUT"); } Ok(()) } }
Checking for schema interval
If schema is not agreed driver sleeps for a duration before checking it again. Default value is 200 miliseconds but it can be changed with SessionBuilder::schema_agreement_interval
.
#![allow(unused)] fn main() { extern crate scylla; use scylla::SessionBuilder; use std::error::Error; use std::time::Duration; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { SessionBuilder::new() .known_node("127.0.0.1:9042") .schema_agreement_interval(Duration::from_secs(1)) .build() .await?; Ok(()) } }
Checking if schema is in agreement now
If you want to check if schema is in agreement now without retrying after failure you can use Session::check_schema_agreement
function.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { if session.check_schema_agreement().await? { println!("SCHEMA AGREED"); } else { println!("SCHEMA IS NOT IN AGREEMENT"); } Ok(()) } }
Data Types
The driver maps database data types to matching Rust types to achieve seamless sending and receiving of CQL values.
See the following chapters for examples on how to send and receive each data type.
See Query values for more information about sending values in queries.
See Query result for more information about reading values from queries
Database types and their Rust equivalents:
Boolean
<---->bool
Tinyint
<---->i8
Smallint
<---->i16
Int
<---->i32
BigInt
<---->i64
Float
<---->f32
Double
<---->f64
Ascii
,Text
,Varchar
<---->&str
,String
Counter
<---->value::Counter
Blob
<---->Vec<u8>
Inet
<---->std::net::IpAddr
Uuid
,Timeuuid
<---->uuid::Uuid
Date
<---->chrono::NaiveDate
,u32
Time
<---->chrono::Duration
Timestamp
<---->chrono::Duration
Decimal
<---->bigdecimal::Decimal
Varint
<---->num_bigint::BigInt
List
<---->Vec<T>
Set
<---->Vec<T>
Map
<---->std::collections::HashMap<K, V>
Tuple
<----> Rust tuplesUDT (User defined type)
<----> Custom user structs with macros
Bool, Tinyint, Smallint, Int, Bigint, Float, Double
Bool
Bool
is represented as rust bool
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a bool into the table let to_insert: bool = true; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a bool from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(bool,)>() { let (bool_value,): (bool,) = row?; } } Ok(()) } }
Tinyint
Tinyint
is represented as rust i8
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a tinyint into the table let to_insert: i8 = 123; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a tinyint from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(i8,)>() { let (tinyint_value,): (i8,) = row?; } } Ok(()) } }
Smallint
Smallint
is represented as rust i16
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a smallint into the table let to_insert: i16 = 12345; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a smallint from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(i16,)>() { let (smallint_value,): (i16,) = row?; } } Ok(()) } }
Int
Int
is represented as rust i32
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert an int into the table let to_insert: i32 = 12345; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read an int from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(i32,)>() { let (int_value,): (i32,) = row?; } } Ok(()) } }
Bigint
Bigint
is represented as rust i64
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a bigint into the table let to_insert: i64 = 12345; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a bigint from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(i64,)>() { let (bigint_value,): (i64,) = row?; } } Ok(()) } }
Float
Float
is represented as rust f32
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a float into the table let to_insert: f32 = 123.0; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a float from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(f32,)>() { let (float_value,): (f32,) = row?; } } Ok(()) } }
Double
Double
is represented as rust f64
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a double into the table let to_insert: f64 = 12345.0; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a double from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(f64,)>() { let (double_value,): (f64,) = row?; } } Ok(()) } }
Ascii, Text, Varchar
Ascii
, Text
and Varchar
are represented as &str
and String
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert some text into the table as a &str let to_insert_str: &str = "abcdef"; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert_str,)) .await?; // Insert some text into the table as a String let to_insert_string: String = "abcdef".to_string(); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert_string,)) .await?; // Read ascii/text/varchar from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(String,)>() { let (text_value,): (String,) = row?; } } Ok(()) } }
Counter
Counter
is represented as struct Counter(pub i64)
Counter
can't be inserted, it can only be read or updated.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::frame::value::Counter; // Read counter from the table if let Some(rows) = session.query("SELECT c FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Counter,)>() { let (counter_value,): (Counter,) = row?; let counter_int_value: i64 = counter_value.0; } } Ok(()) } }
Blob
Blob
is represented as Vec<u8>
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert some blob into the table as a Vec<u8> // We can insert it by reference to not move the whole blob let to_insert: Vec<u8> = vec![1, 2, 3, 4, 5]; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,)) .await?; // Read blobs from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Vec<u8>,)>() { let (blob_value,): (Vec<u8>,) = row?; } } Ok(()) } }
Inet
Inet
is represented as std::net::IpAddr
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use std::net::{IpAddr, Ipv4Addr}; // Insert some ip address into the table let to_insert: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read inet from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(IpAddr,)>() { let (inet_value,): (IpAddr,) = row?; } } Ok(()) } }
Uuid, Timeuuid
Uuid
and Timeuuid
are represented as uuid::Uuid
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use uuid::Uuid; // Insert some uuid/timeuuid into the table let to_insert: Uuid = Uuid::parse_str("8e14e760-7fa8-11eb-bc66-000000000001")?; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read uuid/timeuuid from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Uuid,)>() { let (uuid_value,): (Uuid,) = row?; } } Ok(()) } }
Date
For most use cases Date
can be represented as
chrono::NaiveDate
.
NaiveDate
supports dates from -262145-1-1 to 262143-12-31.
For dates outside of this range you can use the raw u32
representation.
Using chrono::NaiveDate
:
#![allow(unused)] fn main() { extern crate scylla; extern crate chrono; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use chrono::naive::NaiveDate; // Insert some date into the table let to_insert: NaiveDate = NaiveDate::from_ymd(2021, 3, 24); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read NaiveDate from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(NaiveDate,)>() { let (date_value,): (NaiveDate,) = row?; } } Ok(()) } }
Using raw u32
representation
Internally Date
is represented as number of days since -5877641-06-23 i.e. 2^31 days before unix epoch.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::frame::value::Date; use scylla::frame::response::result::CqlValue; // Insert date using raw u32 representation let to_insert: Date = Date(2_u32.pow(31)); // 1970-01-01 session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read raw Date from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows { let date_value: u32 = match row.columns[0] { Some(CqlValue::Date(date_value)) => date_value, _ => panic!("Should be a date!") }; } } Ok(()) } }
Time
Time
is represented as chrono::Duration
Internally Time
is represented as number of nanoseconds since midnight.
It can't be negative or exceed 86399999999999
(24 hours).
When sending in a query it needs to be wrapped in value::Time
to differentiate from Timestamp
#![allow(unused)] fn main() { extern crate scylla; extern crate chrono; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::frame::value::Time; use chrono::Duration; // Insert some time into the table let to_insert: Duration = Duration::seconds(64); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (Time(to_insert),)) .await?; // Read time from the table, no need for a wrapper here if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Duration,)>() { let (time_value,): (Duration,) = row?; } } Ok(()) } }
Timestamp
Timestamp
is represented as chrono::Duration
Internally Timestamp
is represented as i64
describing number of milliseconds since unix epoch.
Driver converts this to chrono::Duration
When sending in a query it needs to be wrapped in value::Timestamp
to differentiate from Time
#![allow(unused)] fn main() { extern crate scylla; extern crate chrono; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::frame::value::Timestamp; use chrono::Duration; // Insert some timestamp into the table let to_insert: Duration = Duration::seconds(64); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (Timestamp(to_insert),)) .await?; // Read timestamp from the table, no need for a wrapper here if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Duration,)>() { let (timestamp_value,): (Duration,) = row?; } } Ok(()) } }
Decimal
Decimal
is represented as bigdecimal::BigDecimal
#![allow(unused)] fn main() { extern crate scylla; extern crate bigdecimal; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use bigdecimal::BigDecimal; use std::str::FromStr; // Insert a decimal into the table let to_insert: BigDecimal = BigDecimal::from_str("12345.0")?; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a decimal from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(BigDecimal,)>() { let (decimal_value,): (BigDecimal,) = row?; } } Ok(()) } }
Varint
Varint
is represented as num_bigint::BigInt
#![allow(unused)] fn main() { extern crate scylla; extern crate num_bigint; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use num_bigint::BigInt; use std::str::FromStr; // Insert a varint into the table let to_insert: BigInt = BigInt::from_str("12345")?; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a varint from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(BigInt,)>() { let (varint_value,): (BigInt,) = row?; } } Ok(()) } }
List, Set, Map
List
List
is represented as Vec<T>
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a list of ints into the table let to_insert: Vec<i32> = vec![1, 2, 3, 4, 5]; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,)) .await?; // Read a list of ints from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Vec<i32>,)>() { let (list_value,): (Vec<i32>,) = row?; } } Ok(()) } }
Set
Set
is represented as Vec<T>
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a set of ints into the table let to_insert: Vec<i32> = vec![1, 2, 3, 4, 5]; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,)) .await?; // Read a set of ints from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(Vec<i32>,)>() { let (set_value,): (Vec<i32>,) = row?; } } Ok(()) } }
Map
Map
is represented as std::collections::HashMap<K, V>
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use std::collections::HashMap; // Insert a map of text and int into the table let mut to_insert: HashMap<String, i32> = HashMap::new(); to_insert.insert("abcd".to_string(), 16); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (&to_insert,)) .await?; // Read a map from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(HashMap<String, i32>,)>() { let (map_value,): (HashMap<String, i32>,) = row?; } } Ok(()) } }
Tuple
Tuple
is represented as rust tuples of max 16 elements.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; // Insert a tuple of int and string into the table let to_insert: (i32, String) = (1, "abc".to_string()); session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read a tuple of int and string from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<((i32, String),)>() { let (tuple_value,): ((i32, String),) = row?; let int_value: i32 = tuple_value.0; let string_value: String = tuple_value.1; } } Ok(()) } }
User defined types
Scylla allows users to define their own data types with named fields.
The driver supports this by allowing to create a custom struct with derived functionality.
For example let's say my_type
was created using this query:
CREATE TYPE ks.my_type (int_val int, text_val text)
To use this type in the driver create a matching struct and derive IntoUserType
and FromUserType
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::IntoTypedRows; use scylla::macros::{FromUserType, IntoUserType}; use scylla::cql_to_rust::FromCqlVal; // Define custom struct that matches User Defined Type created earlier // wrapping field in Option will gracefully handle null field values #[derive(Debug, IntoUserType, FromUserType)] struct MyType { int_val: i32, text_val: Option<String>, } // Now it can be sent and received like any other value // Insert my_type into the table let to_insert = MyType { int_val: 17, text_val: Some("Some string".to_string()), }; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; // Read MyType from the table if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { for row in rows.into_typed::<(MyType,)>() { let (my_type_value,): (MyType,) = row?; } } Ok(()) } }
Load balancing
There are multiple load balancing strategies that the driver can use.
Load balancing can be configured for the whole Session
during creation.
Basic load balancing strategies:
RoundRobinPolicy
- uses all known nodes one after anotherDcAwareRoundRobinPolicy
- uses all known nodes from the local datacenter one after another
Each of these basic load balancing strategies can be wrapped in TokenAwarePolicy
to enable token awareness.
Note
Only prepared queries use token aware load balancing
All queries are shard aware, there is no way to turn off shard awareness.
If a token is available the query is sent to the correct shard, otherwise to a random one.
So, the available load balancing policies are:
By default the driver uses Token aware Round robin
Round robin
The simplest load balancing policy available.
Takes all nodes in the cluster and uses them one after another.
For example if there are nodes A
, B
, C
in the cluster,
this policy will use A
, B
, C
, A
, B
, ...
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::load_balancing::RoundRobinPolicy; use std::sync::Arc; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .load_balancing(Arc::new(RoundRobinPolicy::new())) .build() .await?; Ok(()) } }
DC Aware Round robin
This is a more sophisticated version of Round robin policy.
It takes all nodes in the local datacenter and uses them one after another.
If no nodes from the local datacenter are available it will fall back to other nodes.
For example if there are two datacenters:
us_east
with nodes:A
,B
,C
us_west
with nodes:D
,E
,F
this policy when set to us_east
will only use A
, B
, C
, A
, B
, ...
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::load_balancing::DcAwareRoundRobinPolicy; use std::sync::Arc; let local_dc_name: String = "us_east".to_string(); let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .load_balancing(Arc::new(DcAwareRoundRobinPolicy::new(local_dc_name))) .build() .await?; Ok(()) } }
Token aware Round robin
This policy will try to calculate a token to find replica nodes in which queried data is stored.
After finding the replicas it performs a round robin on them.
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::load_balancing::{RoundRobinPolicy, TokenAwarePolicy}; use std::sync::Arc; let robin = Box::new(RoundRobinPolicy::new()); let policy = Arc::new(TokenAwarePolicy::new(robin)); let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .load_balancing(policy) .build() .await?; Ok(()) } }
Token aware DC Aware Round robin
This policy will try to calculate a token to find replica nodes in which queried data is stored.
After finding the replicas it chooses the ones from the local datacenter and performs a round robin on them.
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::load_balancing::{DcAwareRoundRobinPolicy, TokenAwarePolicy}; use std::sync::Arc; let local_dc: String = "us_east".to_string(); let dc_robin = Box::new(DcAwareRoundRobinPolicy::new(local_dc)); let policy = Arc::new(TokenAwarePolicy::new(dc_robin)); let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .load_balancing(policy) .build() .await?; Ok(()) } }
Retry policy configuration
After a query fails the driver might decide to retry it based on its Retry Policy
and the query itself.
Retry policy can be configured for Session
or just for a single query.
Retry policies
By default there are two retry policies:
- Fallthrough Retry Policy - never retries, returns all errors straight to the user
- Default Retry Policy - used by default, might retry if there is a high chance of success
It's possible to implement a custom Retry Policy
by implementing the traits RetryPolicy
and RetrySession
.
Query idempotence
A query is idempotent if it can be applied multiple times without changing the result of the initial application
Specifying that a query is idempotent increases the chances that it will be retried in case of failure. Idempotent queries can be retried in situations where retrying non idempotent queries would be dangerous.
Idempotence has to be specified manually, the driver is not able to figure it out by itself.
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::prepared_statement::PreparedStatement; // Specify that a Query is idempotent let mut my_query: Query = Query::new("SELECT a FROM ks.tab"); my_query.set_is_idempotent(true); // Specify that a PreparedStatement is idempotent let mut prepared: PreparedStatement = session .prepare("SELECT a FROM ks.tab") .await?; prepared.set_is_idempotent(true); Ok(()) } }
Fallthrough retry policy
The FalthroughRetryPolicy
never retries, returns errors straight to the user. Useful for debugging.
Examples
To use in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::retry_policy::FallthroughRetryPolicy; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .retry_policy(Box::new(FallthroughRetryPolicy::new())) .build() .await?; Ok(()) } }
To use in a simple query:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::transport::retry_policy::FallthroughRetryPolicy; // Create a Query manually and set the retry policy let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)"); my_query.set_retry_policy(Box::new(FallthroughRetryPolicy::new())); // Run the query using this retry policy let to_insert: i32 = 12345; session.query(my_query, (to_insert,)).await?; Ok(()) } }
To use in a prepared query:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; use scylla::transport::retry_policy::FallthroughRetryPolicy; // Create PreparedStatement manually and set the retry policy let mut prepared: PreparedStatement = session .prepare("INSERT INTO ks.tab (a) VALUES(?)") .await?; prepared.set_retry_policy(Box::new(FallthroughRetryPolicy::new())); // Run the query using this retry policy let to_insert: i32 = 12345; session.execute(&prepared, (to_insert,)).await?; Ok(()) } }
Default retry policy
This is the retry policy used by default. It retries when there is a high chance that it might help.
This policy is based on the one in DataStax Java Driver.
The behaviour is the same.
Examples
To use in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use scylla::{Session, SessionBuilder}; use scylla::transport::retry_policy::DefaultRetryPolicy; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .retry_policy(Box::new(DefaultRetryPolicy::new())) .build() .await?; Ok(()) } }
To use in a simple query:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::transport::retry_policy::DefaultRetryPolicy; // Create a Query manually and set the retry policy let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)"); my_query.set_retry_policy(Box::new(DefaultRetryPolicy::new())); // Run the query using this retry policy let to_insert: i32 = 12345; session.query(my_query, (to_insert,)).await?; Ok(()) } }
To use in a prepared query:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; use scylla::transport::retry_policy::DefaultRetryPolicy; // Create PreparedStatement manually and set the retry policy let mut prepared: PreparedStatement = session .prepare("INSERT INTO ks.tab (a) VALUES(?)") .await?; prepared.set_retry_policy(Box::new(DefaultRetryPolicy::new())); // Run the query using this retry policy let to_insert: i32 = 12345; session.execute(&prepared, (to_insert,)).await?; Ok(()) } }
Speculative execution
Speculative query execution is an optimization technique where a driver pre-emptively starts a second execution of a query against another node, before the first node has replied.
There are multiple speculative execution strategies that the driver can use.
Speculative execution can be configured for the whole whole Session
during
its creation.
Available speculative execution strategies:
Speculative execution is not enabled by default, and currently only non-iter session methods use it.
Simple speculative execution
The easiest speculative execution policy available. It starts another
execution of a query after constant delay of retry_interval
and does at most
max_retry_count
speculative query executions (not counting the first,
non-speculative one).
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use std::{sync::Arc, time::Duration}; use scylla::{ Session, SessionBuilder, speculative_execution::SimpleSpeculativeExecutionPolicy }; let policy = SimpleSpeculativeExecutionPolicy { max_retry_count: 3, retry_interval: Duration::from_millis(100), }; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .speculative_execution(Arc::new(policy)) .build() .await?; Ok(()) } }
Percentile speculative execution
This policy has access to Metrics
shared with session, and triggers
speculative execution when the request to the current host is above a
given percentile.
Example
To use this policy in Session
:
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles() -> Result<(), Box<dyn Error>> { use std::{sync::Arc, time::Duration}; use scylla::{ Session, SessionBuilder, speculative_execution::PercentileSpeculativeExecutionPolicy, }; let policy = PercentileSpeculativeExecutionPolicy { max_retry_count: 3, percentile: 99.0, }; let session: Session = SessionBuilder::new() .known_node("127.0.0.1:9042") .speculative_execution(Arc::new(policy)) .build() .await?; Ok(()) } }
Driver metrics
During operation the driver collects various metrics.
They can be accessed at any moment using Session::get_metrics()
Collected metrics:
- Query latencies
- Total number of nonpaged queries
- Number of errors during nonpaged queries
- Total number of paged queries
- Number of errors during paged queries
- Number of retries
Example
#![allow(unused)] fn main() { extern crate scylla; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { let metrics = session.get_metrics(); println!("Queries requested: {}", metrics.get_queries_num()); println!("Iter queries requested: {}", metrics.get_queries_iter_num()); println!("Errors occured: {}", metrics.get_errors_num()); println!("Iter errors occured: {}", metrics.get_errors_iter_num()); println!("Average latency: {}", metrics.get_latency_avg_ms().unwrap()); println!( "99.9 latency percentile: {}", metrics.get_latency_percentile_ms(99.9).unwrap() ); Ok(()) } }
Logging
The driver uses the tracing crate for all logs.
To view the logs you have to create a tracing
subscriber to which all logs will be written.
To just print the logs you can use the default subscriber:
extern crate scylla; extern crate tokio; extern crate tracing; extern crate tracing_subscriber; use std::error::Error; use scylla::{Session, SessionBuilder}; use tracing::info; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { // Install global collector configured based on RUST_LOG env var // This collector will receive logs from the driver tracing_subscriber::fmt::init(); let uri = std::env::var("SCYLLA_URI") .unwrap_or_else(|_| "127.0.0.1:9042".to_string()); info!("Connecting to {}", uri); let session: Session = SessionBuilder::new().known_node(uri).build().await?; session .query( "CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = \ {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[], ) .await?; // This query should generate a warning message session.query("USE ks", &[]).await?; Ok(()) }
To start this example execute:
RUST_LOG=info cargo run
The full example is available in the examples
folder
Query tracing
Each query's execution can be traced to see to which nodes it was sent, what operations were performed etc.
First tracing has to be enabled for a query and then its result will contain a tracing id which can be used to query tracing information.
Queries that support tracing:
Session::query()
Session::query_iter()
Session::execute()
Session::execute_iter()
Session::batch()
Session::prepare()
After obtaining the tracing id you can use Session::get_tracing_info()
to query tracing information.
TracingInfo
contains values that are the same in Scylla and Cassandra®.
If TracingInfo
does not contain some needed value it's possible to query it manually from the tables
system_traces.sessions
and system_traces.events
Tracing a simple/prepared query
Both simple query and prepared query
return a QueryResult
which contains a tracing_id
if tracing was enabled.
Tracing a simple query
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::QueryResult; use scylla::tracing::TracingInfo; use uuid::Uuid; // Create a Query manually and enable tracing let mut query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(4)"); query.set_tracing(true); let res: QueryResult = session.query(query, &[]).await?; let tracing_id: Option<Uuid> = res.tracing_id; if let Some(id) = tracing_id { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(&id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }
Tracing a prepared query
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; use scylla::QueryResult; use scylla::tracing::TracingInfo; use uuid::Uuid; // Prepare the query let mut prepared: PreparedStatement = session .prepare("SELECT a FROM ks.tab") .await?; // Enable tracing for the prepared query prepared.set_tracing(true); let res: QueryResult = session.execute(&prepared, &[]).await?; let tracing_id: Option<Uuid> = res.tracing_id; if let Some(id) = tracing_id { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(&id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }
Tracing a batch query
Session::batch
returns a BatchResult
which contains a tracing_id
if tracing was enabled.
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::batch::Batch; use scylla::BatchResult; use scylla::tracing::TracingInfo; use uuid::Uuid; // Create a batch statement let mut batch: Batch = Default::default(); batch.append_statement("INSERT INTO ks.tab (a) VALUES(4)"); // Enable tracing batch.set_tracing(true); let res: BatchResult = session.batch(&batch, ((),)).await?; let tracing_id: Option<Uuid> = res.tracing_id; if let Some(id) = tracing_id { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(&id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }
Tracing a paged query
A paged query performs multiple simple/prepared queries to query subsequent pages.
If tracing is enabled the row iterator will contain a list of tracing ids for all performed queries.
Tracing Session::query_iter
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; extern crate futures; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::transport::iterator::RowIterator; use scylla::tracing::TracingInfo; use futures::StreamExt; use uuid::Uuid; // Create a Query manually and enable tracing let mut query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(4)"); query.set_tracing(true); // Create a paged query iterator and fetch pages let mut row_iterator: RowIterator = session.query_iter(query, &[]).await?; while let Some(_row) = row_iterator.next().await { // Receive rows } // Now there are tracing ids for each performed query let tracing_ids: &[Uuid] = row_iterator.get_tracing_ids(); for id in tracing_ids { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }
Tracing Session::execute_iter
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; extern crate futures; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::prepared_statement::PreparedStatement; use scylla::transport::iterator::RowIterator; use scylla::tracing::TracingInfo; use futures::StreamExt; use uuid::Uuid; // Prepare the query let mut prepared: PreparedStatement = session .prepare("SELECT a FROM ks.tab") .await?; // Enable tracing for the prepared query prepared.set_tracing(true); // Create a paged query iterator and fetch pages let mut row_iterator: RowIterator = session.execute_iter(prepared, &[]).await?; while let Some(_row) = row_iterator.next().await { // Receive rows } // Now there are tracing ids for each performed query let tracing_ids: &[Uuid] = row_iterator.get_tracing_ids(); for id in tracing_ids { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }
Tracing Session::prepare
Session::prepare
prepares a query on all connections. If tracing is enabled for the Query
to prepare, the resulting PreparedStatement
will contain prepare_tracing_ids
. prepare_tracing_ids
is a list of tracing ids of prepare requests on all connections.
#![allow(unused)] fn main() { extern crate scylla; extern crate uuid; use scylla::Session; use std::error::Error; async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> { use scylla::query::Query; use scylla::prepared_statement::PreparedStatement; use scylla::tracing::TracingInfo; use uuid::Uuid; // Prepare the query with tracing enabled let mut to_prepare: Query = Query::new("SELECT a FROM ks.tab"); to_prepare.set_tracing(true); let mut prepared: PreparedStatement = session .prepare(to_prepare) .await?; // Now there are tracing ids for each prepare request let tracing_ids: &[Uuid] = &prepared.prepare_tracing_ids; for id in tracing_ids { // Query tracing info from system_traces.sessions and system_traces.events let tracing_info: TracingInfo = session.get_tracing_info(id).await?; println!("tracing_info: {:#?}", tracing_info); } Ok(()) } }