37 releases
0.13.1 | Oct 21, 2024 |
---|---|
0.12.2 | Aug 20, 2024 |
0.12.0 | Jul 16, 2024 |
0.11.6 | Sep 27, 2023 |
0.1.0 | May 17, 2017 |
#20 in Database interfaces
40,822 downloads per month
Used in 14 crates
(11 directly)
175KB
4K
SLoC
clickhouse-rs
Official pure Rust typed client for ClickHouse DB.
- Uses
serde
for encoding/decoding rows. - Supports
serde
attributes:skip_serializing
,skip_deserializing
,rename
. - Uses
RowBinary
encoding over HTTP transport.- There are plans to switch to
Native
over TCP.
- There are plans to switch to
- Supports TLS (see
native-tls
andrustls-tls
features below). - Supports compression and decompression (LZ4 and LZ4HC).
- Provides API for selecting.
- Provides API for inserting.
- Provides API for infinite transactional (see below) inserting.
- Provides API for watching live views.
- Provides mocks for unit testing.
Note: ch2rs is useful to generate a row type from ClickHouse.
Usage
To use the crate, add this to your Cargo.toml
:
[dependencies]
clickhouse = "0.13.1"
[dev-dependencies]
clickhouse = { version = "0.13.1", features = ["test-util"] }
Note about ClickHouse prior to v22.6
CH server older than v22.6 (2022-06-16) handles RowBinary
incorrectly in some rare cases. Use 0.11 and enable wa-37420
feature to solve this problem. Don't use it for newer versions.
Create a client
use clickhouse::Client;
let client = Client::default()
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
- Reuse created clients or clone them in order to reuse a connection pool.
Select rows
use serde::Deserialize;
use clickhouse::Row;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let mut cursor = client
.query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- Placeholder
?fields
is replaced withno, name
(fields ofRow
). - Placeholder
?
is replaced with values in followingbind()
calls. - Convenient
fetch_one::<Row>()
andfetch_all::<Row>()
can be used to get a first row or all rows correspondingly. sql::Identifier
can be used to bind table names.
Note that cursors can return an error even after producing some rows. To avoid this, use client.with_option("wait_end_of_query", "1")
in order to enable buffering on the server-side. More details. The buffer_size
option can be useful too.
Insert a batch
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- If
end()
isn't called, theINSERT
is aborted. - Rows are being sent progressively to spread network load.
- ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less
max_insert_block_size
.
Infinite inserting
Requires the inserter
feature.
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
Inserter
ends an active insert incommit()
if thresholds (max_bytes
,max_rows
,period
) are reached.- The interval between ending active
INSERT
s can be biased by usingwith_period_bias
to avoid load spikes by parallel inserters. Inserter::time_left()
can be used to detect when the current period ends. CallInserter::commit()
again to check limits if your stream emits items rarely.- Time thresholds implemented by using quanta crate to speed the inserter up. Not used if
test-util
is enabled (thus, time can be managed bytokio::time::advance()
in custom tests). - All rows between
commit()
calls are inserted in the sameINSERT
statement. - Do not forget to flush if you want to terminate inserting:
inserter.end().await?;
Perform DDL
client.query("DROP TABLE IF EXISTS some").execute().await?;
Live views
Requires the watch
feature.
let mut cursor = client
.watch("SELECT max(no), argMax(name, no) FROM some")
.fetch::<Row<'_>>()?;
let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);
// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
- Use carefully.
- This code uses or creates if not exists a temporary live view named
lv_{sha1(query)}
to reuse the same live view by parallel watchers. - You can specify a name instead of a query.
- This API uses
JSONEachRowWithProgress
under the hood because of the issue. - Only struct rows can be used. Avoid
fetch::<u64>()
and other without specified names.
See examples.
Feature Flags
lz4
(enabled by default) — enablesCompression::Lz4
. If enabled,Compression::Lz4
is used by default for all queries except forWATCH
.inserter
— enablesclient.inserter()
.test-util
— adds mocks. See the example. Use it only indev-dependencies
.watch
— enablesclient.watch
functionality. See the corresponding section for details.uuid
— addsserde::uuid
to work with uuid crate.time
— addsserde::time
to work with time crate.
TLS
By default, TLS is disabled and one or more following features must be enabled to use HTTPS urls:
native-tls
— uses native-tls, utilizing dynamic linking (e.g. against OpenSSL).rustls-tls
— enablesrustls-tls-aws-lc
andrustls-tls-webpki-roots
features.rustls-tls-aws-lc
— uses rustls with theaws-lc
cryptography implementation.rustls-tls-ring
— uses rustls with thering
cryptography implementation.rustls-tls-webpki-roots
— uses rustls with certificates provided by the webpki-roots crate.rustls-tls-native-roots
— uses rustls with certificates provided by the rustls-native-certs crate.
If multiple features are enabled, the following priority is applied:
native-tls
>rustls-tls-aws-lc
>rustls-tls-ring
rustls-tls-native-roots
>rustls-tls-webpki-roots
How to choose between all these features? Here are some considerations:
- A good starting point is
rustls-tls
, e.g. if you use ClickHouse Cloud. - To be more environment-agnostic, prefer
rustls-tls
overnative-tls
. - Enable
rustls-tls-native-roots
ornative-tls
if you want to use self-signed certificates.
Data Types
-
(U)Int(8|16|32|64|128)
maps to/from corresponding(u|i)(8|16|32|64|128)
types or newtypes around them. -
(U)Int256
aren't supported directly, but there is a workaround for it. -
Float(32|64)
maps to/from correspondingf(32|64)
or newtypes around them. -
Decimal(32|64|128)
maps to/from correspondingi(32|64|128)
or newtypes around them. It's more convenient to use fixnum or another implementation of signed fixed-point numbers. -
Boolean
maps to/frombool
or newtypes around it. -
String
maps to/from any string or bytes types, e.g.&str
,&[u8]
,String
,Vec<u8>
orSmartString
. Newtypes are also supported. To store bytes, consider using serde_bytes, because it's more efficient.Example
#[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, #[serde(with = "serde_bytes")] bytes: Vec<u8>, #[serde(with = "serde_bytes")] byte_slice: &'a [u8], }
-
FixedString(N)
is supported as an array of bytes, e.g.[u8; N]
.Example
#[derive(Row, Debug, Serialize, Deserialize)] struct MyRow { fixed_str: [u8; 16], // FixedString(16) }
-
Enum(8|16)
are supported using serde_repr.Example
use serde_repr::{Deserialize_repr, Serialize_repr}; #[derive(Row, Serialize, Deserialize)] struct MyRow { level: Level, } #[derive(Debug, Serialize_repr, Deserialize_repr)] #[repr(u8)] enum Level { Debug = 1, Info = 2, Warn = 3, Error = 4, }
-
UUID
maps to/fromuuid::Uuid
by usingserde::uuid
. Requires theuuid
feature.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::uuid")] uuid: uuid::Uuid, }
-
IPv6
maps to/fromstd::net::Ipv6Addr
. -
IPv4
maps to/fromstd::net::Ipv4Addr
by usingserde::ipv4
.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4")] ipv4: std::net::Ipv4Addr, }
-
Date
maps to/fromu16
or a newtype around it and represents a number of days elapsed since1970-01-01
. Also,time::Date
is supported by usingserde::time::date
, that requires thetime
feature.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: u16, #[serde(with = "clickhouse::serde::time::date")] date: Date, }
-
Date32
maps to/fromi32
or a newtype around it and represents a number of days elapsed since1970-01-01
. Also,time::Date
is supported by usingserde::time::date32
, that requires thetime
feature.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: i32, #[serde(with = "clickhouse::serde::time::date32")] date: Date, }
-
DateTime
maps to/fromu32
or a newtype around it and represents a number of seconds elapsed since UNIX epoch. Also,time::OffsetDateTime
is supported by usingserde::time::datetime
, that requires thetime
feature.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: u32, #[serde(with = "clickhouse::serde::time::datetime")] dt: OffsetDateTime, }
-
DateTime64(_)
maps to/fromi32
or a newtype around it and represents a time elapsed since UNIX epoch. Also,time::OffsetDateTime
is supported by usingserde::time::datetime64::*
, that requires thetime
feature.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)` #[serde(with = "clickhouse::serde::time::datetime64::secs")] dt64s: OffsetDateTime, // `DateTime64(0)` #[serde(with = "clickhouse::serde::time::datetime64::millis")] dt64ms: OffsetDateTime, // `DateTime64(3)` #[serde(with = "clickhouse::serde::time::datetime64::micros")] dt64us: OffsetDateTime, // `DateTime64(6)` #[serde(with = "clickhouse::serde::time::datetime64::nanos")] dt64ns: OffsetDateTime, // `DateTime64(9)` }
-
Tuple(A, B, ...)
maps to/from(A, B, ...)
or a newtype around it. -
Array(_)
maps to/from any slice, e.g.Vec<_>
,&[_]
. Newtypes are also supported. -
Map(K, V)
behaves likeArray((K, V))
. -
LowCardinality(_)
is supported seamlessly. -
Nullable(_)
maps to/fromOption<_>
. Forclickhouse::serde::*
helpers add::option
.Example
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4::option")] ipv4_opt: Option<Ipv4Addr>, }
-
Nested
is supported by providing multiple arrays with renaming.Example
// CREATE TABLE test(items Nested(name String, count UInt32)) #[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(rename = "items.name")] items_name: Vec<String>, #[serde(rename = "items.count")] items_count: Vec<u32>, }
-
Geo
types are supported.Point
behaves like a tuple(f64, f64)
, and the rest of the types are just slices of points.Example
type Point = (f64, f64); type Ring = Vec<Point>; type Polygon = Vec<Ring>; type MultiPolygon = Vec<Polygon>; type LineString = Vec<Point>; type MultiLineString = Vec<LineString>; #[derive(Row, Serialize, Deserialize)] struct MyRow { point: Point, ring: Ring, polygon: Polygon, multi_polygon: MultiPolygon, line_string: LineString, multi_line_string: MultiLineString, }
-
JSON
,Variant
,Dynamic
types are not supported for now.
See also the additional examples:
Mocking
The crate provides utils for mocking CH server and testing DDL, SELECT
, INSERT
and WATCH
queries.
The functionality can be enabled with the test-util
feature. Use it only in dev-dependencies.
See the example.
Dependencies
~7–29MB
~549K SLoC