Skip to content

Commit 775d585

Browse files
committed
Namespace the metrics a little more to allow for nesting more queries
1 parent 9069294 commit 775d585

File tree

3 files changed

+50
-43
lines changed

3 files changed

+50
-43
lines changed

lambdas/query-metrics/manifest.yml

+9-10
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
gauges:
44
# Each gauge should have a distinct name for managing inside of the lambda
55
important_metric:
6-
# Then define a metric name to export to cloudwatch
7-
metric: 'last_10_uniq'
8-
url: 's3://example-bucket/databases/ds-partitioned-delta-table/'
9-
# Currently only a query handler type of `count` is supported
10-
type: count
11-
# The example Datafusion SQL query below queries the source table, which is defined by
12-
# the URL above, to find all the distinct uuids in the last 10 minutes of the current
13-
# `ds` partition.
14-
query: |
15-
SELECT DISTINCT uuid AS total FROM source
6+
- metric: 'last_10_uniq' # Then define a metric name to export to cloudwatch
7+
url: 's3://example-bucket/databases/ds-partitioned-delta-table/'
8+
# Currently only a query handler type of `count` is supported
9+
type: count
10+
# The example Datafusion SQL query below queries the source table, which is defined by
11+
# the URL above, to find all the distinct uuids in the last 10 minutes of the current
12+
# `ds` partition.
13+
query: |
14+
SELECT DISTINCT uuid AS total FROM source
1615
WHERE ds = ARROW_CAST(CURRENT_DATE() , 'Utf8')
1716
AND created_at >= (ARROW_CAST(ARROW_CAST(NOW(), 'Timestamp(Second, None)'), 'Int64') - (60 * 10))

lambdas/query-metrics/src/config.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use url::Url;
99

1010
#[derive(Debug, Deserialize)]
1111
pub struct Configuration {
12-
pub gauges: HashMap<String, Gauge>,
12+
pub gauges: HashMap<String, Vec<Gauge>>,
1313
}
1414

1515
impl Configuration {
@@ -51,10 +51,15 @@ mod tests {
5151
}
5252

5353
#[test]
54-
fn test_config_b64() {
55-
let b = b"LS0tCiMgVGhpcyBpcyBhbiBleGFtcGxlIG1hbmlmZXN0IGZpdWxlIHdoaWNoIGNvbmZpZ3VyZXMgdGhlIGxhbWJkYQpnYXVnZXM6CiAgIyBFYWNoIGdhdWdlIHNob3VsZCBoYXZlIGEgZGlzdGluY3QgbmFtZSBmb3IgbWFuYWdpbmcgaW5zaWRlIG9mIHRoZSBsYW1iZGEKICBpbXBvcnRhbnRfbWV0cmljOgogICAgIyBUaGVuIGRlZmluZSBhIG1ldHJpYyBuYW1lIHRvIGV4cG9ydCB0byBjbG91ZHdhdGNoCiAgICBtZXRyaWM6ICdsYXN0XzEwX3VuaXEnCiAgICB1cmw6ICdzMzovL2V4YW1wbGUtYnVja2V0L2RhdGFiYXNlcy9kcy1wYXJ0aXRpb25lZC1kZWx0YS10YWJsZS8nCiAgICAjIEN1cnJlbnRseSBvbmx5IGEgcXVlcnkgaGFuZGxlciB0eXBlIG9mIGBjb3VudGAgaXMgc3VwcG9ydGVkCiAgICB0eXBlOiBjb3VudAogICAgIyBUaGUgZXhhbXBsZSBEYXRhZnVzaW9uIFNRTCBxdWVyeSBiZWxvdyBxdWVyaWVzIHRoZSBzb3VyY2UgdGFibGUsIHdoaWNoIGlzIGRlZmluZWQgYnkKICAgICMgdGhlIFVSTCBhYm92ZSwgdG8gZmluZCBhbGwgdGhlIGRpc3RpbmN0IHV1aWRzIGluIHRoZSBsYXN0IDEwIG1pbnV0ZXMgb2YgdGhlIGN1cnJlbnQKICAgICMgYGRzYCBwYXJ0aXRpb24uCiAgICBxdWVyeTogfAogICAgICBTRUxFQ1QgRElTVElOQ1QgdXVpZCBBUyB0b3RhbCBGUk9NIHNvdXJjZQogICAgICAgIFdIRVJFIGRzID0gQVJST1dfQ0FTVChDVVJSRU5UX0RBVEUoKSAsICdVdGY4JykKICAgICAgICBBTkQgY3JlYXRlZF9hdCA+PSAoQVJST1dfQ0FTVChBUlJPV19DQVNUKE5PVygpLCAnVGltZXN0YW1wKFNlY29uZCwgTm9uZSknKSwgJ0ludDY0JykgLSAoNjAgKiAxMCkpCg==";
54+
fn test_config_b64() -> anyhow::Result<()> {
55+
use std::io::Read;
56+
let mut manifest = File::open("manifest.yml")?;
57+
let mut buf = vec![];
58+
let count = manifest.read_to_end(&mut buf)?;
59+
let b = BASE64_STANDARD.encode(buf);
5660
let conf: Configuration = Configuration::from_base64(&b).expect("Failed to deserialize");
5761

5862
assert_eq!(conf.gauges.len(), 1);
63+
Ok(())
5964
}
6065
}

lambdas/query-metrics/src/main.rs

+33-30
Original file line numberDiff line numberDiff line change
@@ -27,40 +27,43 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
2727
.expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml");
2828
debug!("Configuration loaded: {conf:?}");
2929

30-
for (name, gauge) in conf.gauges.iter() {
31-
debug!("Querying the {name} table");
32-
let ctx = SessionContext::new();
33-
let table = deltalake::open_table(&gauge.url)
34-
.await
35-
.expect("Failed to register table");
36-
ctx.register_table("source", Arc::new(table))
37-
.expect("Failed to register table with datafusion");
30+
for (name, gauges) in conf.gauges.iter() {
31+
for gauge in gauges.iter() {
32+
debug!("Querying the {name} table");
33+
let ctx = SessionContext::new();
34+
let table = deltalake::open_table(&gauge.url)
35+
.await
36+
.expect("Failed to register table");
37+
ctx.register_table("source", Arc::new(table))
38+
.expect("Failed to register table with datafusion");
3839

39-
debug!("Running query: {}", gauge.query);
40+
debug!("Running query: {}", gauge.query);
4041

41-
let df = ctx
42-
.sql(&gauge.query)
43-
.await
44-
.expect("Failed to execute query");
42+
let df = ctx
43+
.sql(&gauge.query)
44+
.await
45+
.expect("Failed to execute query");
4546

46-
match gauge.measurement_type {
47-
config::Measurement::Count => {
48-
let count = df.count().await.expect("Failed to collect batches");
49-
debug!("Found {count} distinct records");
47+
match gauge.measurement_type {
48+
config::Measurement::Count => {
49+
let count = df.count().await.expect("Failed to collect batches");
50+
debug!("Found {count} distinct records");
5051

51-
let datum = MetricDatum::builder()
52-
.metric_name(&gauge.name)
53-
.timestamp(DateTime::from(SystemTime::now()))
54-
.value(count as f64)
55-
.unit(StandardUnit::Count)
56-
.build();
57-
let res = cloudwatch
58-
.put_metric_data()
59-
.namespace("DataLake")
60-
.metric_data(datum)
61-
.send()
62-
.await?;
63-
debug!("Result of CloudWatch send: {res:?}");
52+
let datum = MetricDatum::builder()
53+
.metric_name(&gauge.name)
54+
.timestamp(DateTime::from(SystemTime::now()))
55+
.value(count as f64)
56+
.unit(StandardUnit::Count)
57+
.build();
58+
59+
let res = cloudwatch
60+
.put_metric_data()
61+
.namespace(format!("DataLake/{name}"))
62+
.metric_data(datum)
63+
.send()
64+
.await?;
65+
debug!("Result of CloudWatch send: {res:?}");
66+
}
6467
}
6568
}
6669
}

0 commit comments

Comments
 (0)