Skip to content

Commit b541c98

Browse files
authored
fix: avoid system shutdown upon uniqueness constraint (#201)
1 parent f28918f commit b541c98

File tree

3 files changed

+73
-8
lines changed

3 files changed

+73
-8
lines changed

integrationos-domain/src/domain/error/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,13 @@ impl IntegrationOSError {
11391139
StatusCode::from(self).as_u16()
11401140
}
11411141

1142+
pub fn is_unique_error(&self) -> bool {
1143+
match self {
1144+
IntegrationOSError::Internal(InternalError::UniqueFieldViolation { .. }) => true,
1145+
_ => false,
1146+
}
1147+
}
1148+
11421149
fn internal(internal: InternalError) -> Self {
11431150
IntegrationOSError::Internal(internal)
11441151
}

integrationos-emit/src/domain/config.rs

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::stream::EventStreamProvider;
22
use envconfig::Envconfig;
3+
use fluvio::dataplane::types::PartitionId;
34
use integrationos_domain::{
45
cache::CacheConfig,
56
{database::DatabaseConfig, environment::Environment},
@@ -51,6 +52,8 @@ pub struct EmitterConfig {
5152
default = "2thZ2UiOnsibmFtZSI6IlN0YXJ0dXBsa3NoamRma3NqZGhma3NqZGhma3NqZG5jhYtggfaP9ubmVjdGlvbnMiOjUwMDAwMCwibW9kdWxlcyI6NSwiZW5kcG9pbnRzIjo3b4e05e2-f050-401f-9822-44f43f71753c"
5253
)]
5354
pub jwt_secret: String,
55+
#[envconfig(from = "STATEFUL_SET_POD_NAME")]
56+
pub stateful_set_pod_name: Option<String>,
5457
#[envconfig(
5558
from = "EVENT_CALLBACK_URL",
5659
default = "http://localhost:3005/v1/event-callbacks"
@@ -64,6 +67,28 @@ pub struct EmitterConfig {
6467
pub db_config: DatabaseConfig,
6568
}
6669

70+
impl EmitterConfig {
71+
/// Returns the partition id to consume from, beware that this assumes several things:
72+
/// 1. The pod name is in the format of `topic-partition-id` (for example in a statefulset)
73+
/// 2. Each pod will now have a 1-1 mapping to a partition
74+
/// 3. It'll read the same partition for the DLQ and the main topic, which means that the DLQ
75+
/// and main topic will have the same amount of partitions.
76+
///
77+
/// ## Warning
78+
/// This is a very brittle assumption, and should be revisited if we ever have a more complex
79+
/// setup or until this gets resolved: https://github.com/infinyon/fluvio/issues/760
80+
pub fn partition(&self) -> Option<PartitionId> {
81+
let pod_name = self.stateful_set_pod_name.as_ref()?;
82+
83+
if let Some((_, partition_id)) = pod_name.rsplit_once('-') {
84+
let partition_id = PartitionId::from_str(partition_id).ok()?;
85+
Some(partition_id)
86+
} else {
87+
None
88+
}
89+
}
90+
}
91+
6792
impl Display for EmitterConfig {
6893
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
6994
writeln!(f, "SERVER_ADDRESS: {}", self.address)?;
@@ -96,6 +121,7 @@ impl Display for EmitterConfig {
96121
"PUSHER_SLEEP_DURATION_IN_MILLIS: {}",
97122
self.pusher_sleep_duration_millis
98123
)?;
124+
writeln!(f, "STATEFUL_SET_POD_NAME: {:?}", self.stateful_set_pod_name)?;
99125
writeln!(f, "PUSHER_MAX_CHUNK_SIZE: {}", self.pusher_max_chunk_size)?;
100126
writeln!(f, "JWT_SECRET: ****")?;
101127
writeln!(f, "EVENT_CALLBACK_URL: {}", self.event_callback_url)?;

integrationos-emit/src/stream/fluvio_driver.rs

+40-8
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,13 @@ impl FluvioDriverImpl {
110110
)
111111
})?;
112112

113-
let ext = ConsumerConfigExtBuilder::default()
113+
let mut ext = ConsumerConfigExtBuilder::default();
114+
115+
if let Some(partition) = config.partition() {
116+
ext = ext.partition(partition).to_owned();
117+
}
118+
119+
let ext = ext
114120
.topic(consumer_topic)
115121
.offset_start(offset)
116122
.offset_consumer(consumer_id)
@@ -142,7 +148,13 @@ impl FluvioDriverImpl {
142148

143149
let consumer_id = format!("{consumer_id}-dlq");
144150

145-
let ext = ConsumerConfigExtBuilder::default()
151+
let mut ext = ConsumerConfigExtBuilder::default();
152+
153+
if let Some(partition) = config.partition() {
154+
ext = ext.partition(partition).to_owned();
155+
}
156+
157+
let ext = ext
146158
.topic(&topic)
147159
.offset_start(Offset::beginning())
148160
.offset_consumer(consumer_id)
@@ -180,6 +192,20 @@ impl FluvioDriverImpl {
180192
let count = AtomicU64::new(0);
181193
let is_processing = AtomicBool::new(true);
182194

195+
if !consumer.ext.partition.is_empty() {
196+
tracing::info!(
197+
"Consuming events from topic {} partition {}",
198+
target.as_ref(),
199+
consumer
200+
.ext
201+
.partition
202+
.iter()
203+
.map(u32::to_string)
204+
.collect::<Vec<_>>()
205+
.join("-")
206+
);
207+
}
208+
183209
loop {
184210
is_processing.store(false, Ordering::SeqCst);
185211
tokio::select! {
@@ -333,17 +359,23 @@ impl EventStreamExt for FluvioDriverImpl {
333359
return Ok(());
334360
}
335361

336-
ctx.app_stores
362+
let insert_result = ctx
363+
.app_stores
337364
.deduplication
338365
.create_one(&Deduplication {
339366
entity_id: event.entity_id,
340367
metadata: event.metadata.clone(),
341368
})
342-
.await
343-
.map_err(|e| {
344-
tracing::error!("Could not create deduplication record: {e}");
345-
InternalError::unknown("Could not create deduplication record", None)
346-
})?;
369+
.await;
370+
371+
if let Err(e) = insert_result {
372+
tracing::error!("Could not create deduplication record: {e}");
373+
if e.is_unique_error() {
374+
return Ok(());
375+
} else {
376+
return Err(e);
377+
}
378+
}
347379

348380
match target {
349381
EventStreamTopic::Target => {

0 commit comments

Comments
 (0)