Skip to content

Commit 4144457

Browse files
authored
refactor: removing duplicated subscriber logic (#186)
1 parent 91cc1b8 commit 4144457

File tree

7 files changed

+65
-71
lines changed

7 files changed

+65
-71
lines changed

integrationos-api/src/main.rs

+9-19
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ use anyhow::Result;
22
use dotenvy::dotenv;
33
use envconfig::Envconfig;
44
use integrationos_api::{domain::config::ConnectionsConfig, server::Server};
5-
use integrationos_domain::telemetry::{
6-
get_subscriber, get_subscriber_with_trace, init_subscriber, OtelGuard,
7-
};
5+
use integrationos_domain::telemetry::{get_subscriber, init_subscriber, OtelGuard};
86
use tracing::info;
97

108
fn main() -> Result<()> {
@@ -20,22 +18,14 @@ fn main() -> Result<()> {
2018
.build()?;
2119

2220
runtime.block_on(async move {
23-
match config.otlp_endpoint {
24-
Some(ref otlp_url) => {
25-
let subscriber = get_subscriber_with_trace(
26-
"connections-api".into(),
27-
"info".into(),
28-
std::io::stdout,
29-
otlp_url.into(),
30-
);
31-
init_subscriber(subscriber)
32-
}
33-
None => {
34-
let subscriber =
35-
get_subscriber("connections-api".into(), "info".into(), std::io::stdout);
36-
init_subscriber(subscriber)
37-
}
38-
};
21+
let subscriber = get_subscriber(
22+
"connections-api".into(),
23+
"info".into(),
24+
std::io::stdout,
25+
config.otlp_endpoint.clone(),
26+
);
27+
28+
init_subscriber(subscriber);
3929

4030
info!("Starting API with config:\n{config}");
4131

integrationos-archiver/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async fn main() -> Result<Unit> {
3737
StorageProvider::GoogleCloud => GoogleCloudStorage::new(&config).await?,
3838
});
3939

40-
let subscriber = get_subscriber("archiver".into(), "info".into(), std::io::stdout);
40+
let subscriber = get_subscriber("archiver".into(), "info".into(), std::io::stdout, None);
4141
init_subscriber(subscriber);
4242

4343
tracing::info!("Starting archiver with config:\n{config}");

integrationos-database/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn main() -> Result<()> {
1414
dotenv().ok();
1515
let config = DatabaseConnectionConfig::init_from_env()?;
1616

17-
let subscriber = get_subscriber("storage".into(), "info".into(), std::io::stdout);
17+
let subscriber = get_subscriber("storage".into(), "info".into(), std::io::stdout, None);
1818
init_subscriber(subscriber);
1919

2020
info!("Starting Storage API with config:\n{config}");

integrationos-domain/src/service/telemetry/mod.rs

+46-48
Original file line numberDiff line numberDiff line change
@@ -22,53 +22,11 @@ where
2222
pub subscriber: T,
2323
}
2424

25-
pub fn get_subscriber_with_trace<Sink>(
26-
name: String,
27-
env_filter: String,
28-
sink: Sink,
29-
otlp_url: String,
30-
) -> Telemetry<impl SubscriberExt + Send + Sync + 'static>
31-
where
32-
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
33-
{
34-
let formatting_layer: BunyanFormattingLayer<Sink> =
35-
BunyanFormattingLayer::new(name.clone(), sink);
36-
37-
let exporter = opentelemetry_otlp::new_exporter()
38-
.tonic()
39-
.with_endpoint(otlp_url.clone());
40-
41-
let provider = opentelemetry_otlp::new_pipeline()
42-
.tracing()
43-
.with_trace_config(
44-
Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
45-
opentelemetry::KeyValue::new("service.name", name),
46-
])),
47-
)
48-
.with_batch_config(BatchConfig::default())
49-
.with_exporter(exporter)
50-
.install_batch(Tokio)
51-
.expect("Failed to install tracing pipeline");
52-
53-
global::set_tracer_provider(provider.clone());
54-
let tracer = provider.tracer("tracing-otel-subscriber");
55-
56-
let filter_layer =
57-
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));
58-
59-
Telemetry {
60-
subscriber: Registry::default()
61-
.with(filter_layer)
62-
.with(JsonStorageLayer)
63-
.with(OpenTelemetryLayer::new(tracer))
64-
.with(formatting_layer),
65-
}
66-
}
67-
6825
pub fn get_subscriber<Sink>(
6926
name: String,
7027
env_filter: String,
7128
sink: Sink,
29+
otlp_url: Option<String>,
7230
) -> Telemetry<impl SubscriberExt + Send + Sync + 'static>
7331
where
7432
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
@@ -79,11 +37,51 @@ where
7937
let filter_layer =
8038
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));
8139

82-
Telemetry {
83-
subscriber: Registry::default()
84-
.with(filter_layer)
85-
.with(JsonStorageLayer)
86-
.with(formatting_layer),
40+
match otlp_url {
41+
Some(otlp_url) => {
42+
let exporter = opentelemetry_otlp::new_exporter()
43+
.tonic()
44+
.with_endpoint(otlp_url.clone());
45+
46+
let provider = opentelemetry_otlp::new_pipeline()
47+
.tracing()
48+
.with_trace_config(Config::default().with_resource(
49+
opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new(
50+
"service.name",
51+
name,
52+
)]),
53+
))
54+
.with_batch_config(BatchConfig::default())
55+
.with_exporter(exporter)
56+
.install_batch(Tokio)
57+
.expect("Failed to install tracing pipeline");
58+
59+
global::set_tracer_provider(provider.clone());
60+
let tracer = provider.tracer("tracing-otel-subscriber");
61+
62+
let telemetry: Telemetry<Box<dyn SubscriberExt + Send + Sync>> = Telemetry {
63+
subscriber: Box::new(
64+
Registry::default()
65+
.with(OpenTelemetryLayer::new(tracer))
66+
.with(filter_layer)
67+
.with(JsonStorageLayer)
68+
.with(formatting_layer),
69+
),
70+
};
71+
72+
telemetry
73+
}
74+
None => {
75+
let telemetry: Telemetry<Box<dyn SubscriberExt + Send + Sync>> = Telemetry {
76+
subscriber: Box::new(
77+
Registry::default()
78+
.with(filter_layer)
79+
.with(JsonStorageLayer)
80+
.with(formatting_layer),
81+
),
82+
};
83+
telemetry
84+
}
8785
}
8886
}
8987

integrationos-event/src/main.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ use tracing::{error, info, warn};
2626
async fn main() -> Result<()> {
2727
dotenv().ok();
2828

29-
let suscriber = get_subscriber("integrationos-event".into(), "info".into(), std::io::stdout);
29+
let suscriber = get_subscriber(
30+
"integrationos-event".into(),
31+
"info".into(),
32+
std::io::stdout,
33+
None,
34+
);
3035
init_subscriber(suscriber);
3136

3237
let config = EventCoreConfig::init_from_env()?;

integrationos-gateway/src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ async fn main() -> Result<()> {
1515
"integrationos-gateway".into(),
1616
"info".into(),
1717
std::io::stdout,
18+
None,
1819
);
1920
init_subscriber(suscriber);
2021

integrationos-watchdog/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tracing::info;
1717
async fn main() -> Result<()> {
1818
dotenv().ok();
1919

20-
let suscriber = get_subscriber("watchdog".into(), "info".into(), std::io::stdout);
20+
let suscriber = get_subscriber("watchdog".into(), "info".into(), std::io::stdout, None);
2121
init_subscriber(suscriber);
2222

2323
let watchdog_config = WatchdogConfig::init_from_env().context("Could not load config")?;

0 commit comments

Comments
 (0)