Skip to content

Commit f0d56c7

Browse files
authored
feat: implement metrics for event emitter (#202)
1 parent b541c98 commit f0d56c7

File tree

33 files changed

+721
-289
lines changed

33 files changed

+721
-289
lines changed

.github/workflows/tests.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
- name: Install fluvio CLI
5858
run: curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
5959
- name: Create fluvio topic
60-
run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create -p 2 events && ~/.fluvio/bin/fluvio topic create -p 2 dlq
60+
run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create events && ~/.fluvio/bin/fluvio topic create dlq
6161
- name: Install protoc
6262
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
6363
- uses: dtolnay/rust-toolchain@stable

Cargo.lock

+69-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integrationos-api/src/logic/connection.rs

+2
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ pub async fn create_connection(
321321
group,
322322
identity: Some(identity.to_owned()),
323323
name: payload.name,
324+
has_error: false,
325+
error: None,
324326
identity_type: payload.identity_type,
325327
platform: connection_config.platform.into(),
326328
environment: event_access.environment,

integrationos-api/src/logic/event_callback.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use axum::{
99
Json, Router,
1010
};
1111
use bson::doc;
12-
use integrationos_domain::{ApplicationError, Connection, Id, IntegrationOSError};
12+
use integrationos_domain::{
13+
emitted_events::ConnectionLostReason, ApplicationError, Connection, Id, IntegrationOSError,
14+
};
1315
use std::sync::Arc;
1416

1517
pub fn get_router() -> Router<Arc<AppState>> {
@@ -19,10 +21,10 @@ pub fn get_router() -> Router<Arc<AppState>> {
1921
)
2022
}
2123

22-
// TODO: Write tests for this endpoint
2324
async fn database_connection_lost_callback(
2425
State(state): State<Arc<AppState>>,
2526
Path(connection_id): Path<Id>,
27+
Json(reason): Json<ConnectionLostReason>,
2628
) -> Result<Json<Connection>, IntegrationOSError> {
2729
// Instead of direcly updating we're getting the record first so that we can
2830
// modify the active and deprecated fields from the record metadata
@@ -41,6 +43,7 @@ async fn database_connection_lost_callback(
4143
)),
4244
Some(mut conn) => {
4345
if conn.record_metadata.active {
46+
conn.mark_error(reason.reason.as_str());
4447
conn.record_metadata.mark_deprecated("system");
4548
conn.record_metadata.mark_inactive("system");
4649
conn.record_metadata.mark_updated("system");

integrationos-api/src/logic/oauth.rs

+2
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ async fn oauth_handler(
255255
identity: Some(identity),
256256
identity_type: payload.identity_type,
257257
settings: conn_definition.settings,
258+
has_error: false,
259+
error: None,
258260
throughput: Throughput {
259261
key,
260262
limit: throughput,

integrationos-api/tests/checker.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::{
44
};
55

66
use serde::{de::DeserializeOwned, Serialize};
7-
use serde_json::Value;
87

98
pub enum CheckType {
109
Json,
@@ -87,10 +86,10 @@ impl JsonChecker for JsonCheckerImpl {
8786
file.read_to_string(&mut contents)
8887
.expect("Failed to read file contents");
8988

90-
let expected = serde_json::from_str::<Value>(&contents)
89+
let expected = serde_json::from_str::<T>(&contents)
9190
.expect("Failed to deserialize expect value");
9291

93-
let actual = serde_json::from_str::<Value>(&serialized)
92+
let actual = serde_json::from_str::<T>(&serialized)
9493
.expect("Failed to deserialize actual value");
9594

9695
expected == actual

integrationos-api/tests/context.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ impl SecretExt for MockSecretsClient {
104104
}
105105

106106
#[derive(Debug, Clone, Eq, PartialEq)]
107-
pub struct ApiResponse<T: DeserializeOwned = Value> {
107+
pub struct ApiResponse<Data: DeserializeOwned = Value> {
108108
pub code: StatusCode,
109-
pub data: T,
109+
pub data: Data,
110110
}
111111

112112
impl TestServer {
@@ -268,13 +268,13 @@ impl TestServer {
268268
}
269269
}
270270

271-
pub async fn send_request<T: Serialize, U: DeserializeOwned>(
271+
pub async fn send_request<Payload: Serialize, Response: DeserializeOwned>(
272272
&self,
273273
path: &str,
274274
method: http::Method,
275275
key: Option<&str>,
276-
payload: Option<&T>,
277-
) -> Result<ApiResponse<U>> {
276+
payload: Option<&Payload>,
277+
) -> Result<ApiResponse<Response>> {
278278
self.send_request_with_auth_headers(
279279
path,
280280
method,
@@ -288,14 +288,14 @@ impl TestServer {
288288
.await
289289
}
290290

291-
pub async fn send_request_with_headers<T: Serialize, U: DeserializeOwned>(
291+
pub async fn send_request_with_headers<Payload: Serialize, Response: DeserializeOwned>(
292292
&self,
293293
path: &str,
294294
method: http::Method,
295295
key: Option<&str>,
296-
payload: Option<&T>,
296+
payload: Option<&Payload>,
297297
headers: Option<BTreeMap<String, String>>,
298-
) -> Result<ApiResponse<U>> {
298+
) -> Result<ApiResponse<Response>> {
299299
let mut req = self
300300
.client
301301
.request(method, format!("http://localhost:{}/{path}", self.port));
@@ -319,14 +319,14 @@ impl TestServer {
319319
})
320320
}
321321

322-
async fn send_request_with_auth_headers<T: Serialize, U: DeserializeOwned>(
322+
async fn send_request_with_auth_headers<Payload: Serialize, Response: DeserializeOwned>(
323323
&self,
324324
path: &str,
325325
method: http::Method,
326326
key: Option<&str>,
327-
payload: Option<&T>,
327+
payload: Option<&Payload>,
328328
headers: Option<BTreeMap<String, String>>,
329-
) -> Result<ApiResponse<U>> {
329+
) -> Result<ApiResponse<Response>> {
330330
let headers = match headers {
331331
Some(h) => h
332332
.into_iter()

integrationos-api/tests/http/callback.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::context::TestServer;
22
use http::{Method, StatusCode};
3-
use integrationos_domain::{environment::Environment, prefix::IdPrefix, Connection, Id};
3+
use integrationos_domain::{
4+
emitted_events::ConnectionLostReason, environment::Environment, prefix::IdPrefix, Connection,
5+
Id,
6+
};
47
use serde_json::Value;
58

69
#[tokio::test]
@@ -13,9 +16,12 @@ async fn test_database_connection_lost_callback() {
1316
let connection_id = connection.id.to_string();
1417

1518
let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}");
19+
let reason = ConnectionLostReason {
20+
reason: "database-connection-lost".to_string(),
21+
};
1622

1723
let request = server
18-
.send_request::<Value, Connection>(&path, Method::POST, None, None)
24+
.send_request::<ConnectionLostReason, Connection>(&path, Method::POST, None, Some(&reason))
1925
.await
2026
.expect("Failed to send request");
2127

@@ -31,9 +37,12 @@ async fn test_database_connection_lost_callback_404() {
3137

3238
let connection_id = Id::now(IdPrefix::Connection).to_string();
3339
let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}");
40+
let reason = ConnectionLostReason {
41+
reason: "database-connection-lost".to_string(),
42+
};
3443

3544
let request = server
36-
.send_request::<Value, Value>(&path, Method::POST, None, None)
45+
.send_request::<ConnectionLostReason, Value>(&path, Method::POST, None, Some(&reason))
3746
.await
3847
.expect("Failed to send request");
3948

integrationos-api/tests/standard/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ fn test_json_connection() {
196196
key: "throughput-key".to_string(),
197197
limit: 100,
198198
},
199+
has_error: false,
200+
error: None,
199201
ownership: Ownership {
200202
id: "owner-id".to_string().into(),
201203
client_id: "client-id".to_string(),

integrationos-database/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ tracing-subscriber.workspace = true
3333
tracing.workspace = true
3434

3535
[dev-dependencies]
36+
mockito.workspace = true
3637
testcontainers-modules = { workspace = true, features = ["postgres"] }

integrationos-database/src/main.rs

+6-15
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,14 @@ fn main() -> Result<()> {
2626
.block_on(async move {
2727
match config.database_connection_type {
2828
DatabaseConnectionType::PostgreSql => {
29-
match PostgresDatabaseConnection::init(&config).await {
30-
Ok(server) => {
31-
if let Err(e) = server.run().await {
32-
PostgresDatabaseConnection::kill(&config).await?;
33-
return Err(anyhow::anyhow!("Could not run server: {e}"));
34-
}
29+
let server = PostgresDatabaseConnection::init(&config).await?;
3530

36-
Ok(())
37-
}
38-
Err(e) => {
39-
tracing::error!("Could not initialize storage: {e}");
40-
41-
PostgresDatabaseConnection::kill(&config).await?;
42-
43-
Err(anyhow::anyhow!("Could not initialize storage: {e}"))
44-
}
31+
if let Err(e) = server.run().await {
32+
PostgresDatabaseConnection::kill(&config, e.to_string()).await?;
33+
return Err(e);
4534
}
35+
36+
Ok(())
4637
}
4738
}
4839
})

0 commit comments

Comments
 (0)