-
Notifications
You must be signed in to change notification settings - Fork 106
/
Copy pathasync_insert.rs
78 lines (67 loc) · 2.27 KB
/
async_insert.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::time::{Duration, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use clickhouse::sql::Identifier;
use clickhouse::{error::Result, Client, Row};
// This example demonstrates how to use asynchronous inserts, avoiding client side batching of the incoming data.
// Suitable for ClickHouse Cloud, too. See https://clickhouse.com/docs/en/optimize/asynchronous-inserts
#[derive(Debug, Serialize, Deserialize, Row)]
struct Event {
timestamp: u64,
message: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let table_name = "chrs_async_insert";
let client = Client::default()
.with_url("http://localhost:8123")
// https://clickhouse.com/docs/en/operations/settings/settings#async-insert
.with_option("async_insert", "1")
// https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert
.with_option("wait_for_async_insert", "0");
client
.query(
"
CREATE OR REPLACE TABLE ? (
timestamp DateTime64(9),
message String
)
ENGINE = MergeTree
ORDER BY timestamp
",
)
.bind(Identifier(table_name))
.execute()
.await?;
let mut insert = client.insert(table_name)?;
insert
.write(&Event {
timestamp: now(),
message: "one".into(),
})
.await?;
insert.end().await?;
loop {
let events = client
.query("SELECT ?fields FROM ?")
.bind(Identifier(table_name))
.fetch_all::<Event>()
.await?;
if !events.is_empty() {
println!("Async insert was flushed");
println!("{events:?}");
break;
}
// If you change the `wait_for_async_insert` setting to 1, this line will never be printed;
// however, without waiting, you will see it in the console output several times,
// as the data will remain in the server buffer for a bit before the flush happens
println!("Waiting for async insert flush...");
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
fn now() -> u64 {
UNIX_EPOCH
.elapsed()
.expect("invalid system time")
.as_nanos() as u64
}