Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0db29f1

Browse files
committedMar 19, 2025·
fix: don't accept unknown log format
1 parent 98fe200 commit 0db29f1

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed
 

‎src/event/format/known_schema.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use once_cell::sync::Lazy;
2222
use regex::Regex;
2323
use serde::{Deserialize, Deserializer};
2424
use serde_json::{Map, Value};
25-
use tracing::{error, warn};
25+
use tracing::error;
2626

2727
/// Predefined JSON with known textual logging formats
2828
const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
@@ -32,8 +32,12 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
3232
Lazy::new(|| EventProcessor::new(FORMATS_JSON));
3333

3434
#[derive(Debug, thiserror::Error)]
35-
#[error("Event is not in the expected text/JSON format for {0}")]
36-
pub struct Unacceptable(String);
35+
pub enum Error {
36+
#[error("Event is not in the expected text/JSON format for {0}")]
37+
Unacceptable(String),
38+
#[error("Unknown log format: {0}")]
39+
Unknown(String),
40+
}
3741

3842
/// Deserializes a string pattern into a compiled Regex
3943
/// NOTE: we only warn if the pattern doesn't compile
@@ -178,10 +182,9 @@ impl EventProcessor {
178182
json: &mut Value,
179183
log_source: &str,
180184
extract_log: Option<&str>,
181-
) -> Result<HashSet<String>, Unacceptable> {
185+
) -> Result<HashSet<String>, Error> {
182186
let Some(schema) = self.schema_definitions.get(log_source) else {
183-
warn!("Unknown log format: {log_source}");
184-
return Ok(HashSet::new());
187+
return Err(Error::Unknown(log_source.to_owned()));
185188
};
186189

187190
let mut fields = HashSet::new();
@@ -194,15 +197,15 @@ impl EventProcessor {
194197
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
195198
fields.extend(known_fields);
196199
} else {
197-
return Err(Unacceptable(log_source.to_owned()));
200+
return Err(Error::Unacceptable(log_source.to_owned()));
198201
}
199202
}
200203
}
201204
Value::Object(event) => {
202205
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
203206
return Ok(known_fields);
204207
} else {
205-
return Err(Unacceptable(log_source.to_owned()));
208+
return Err(Error::Unacceptable(log_source.to_owned()));
206209
}
207210
}
208211
_ => unreachable!("We don't accept events of the form: {json}"),

‎src/handlers/http/ingest.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use serde_json::Value;
2828

2929
use crate::event;
3030
use crate::event::error::EventError;
31-
use crate::event::format::known_schema::{Unacceptable, KNOWN_SCHEMA_LIST};
31+
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3232
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3333
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3434
use crate::metadata::SchemaVersion;
@@ -49,7 +49,10 @@ use super::users::filters::FiltersError;
4949
// Handler for POST /api/v1/ingest
5050
// ingests events by extracting stream name from header
5151
// creates if stream does not exist
52-
pub async fn ingest(req: HttpRequest, Json(mut json): Json<Value>) -> Result<HttpResponse, PostError> {
52+
pub async fn ingest(
53+
req: HttpRequest,
54+
Json(mut json): Json<Value>,
55+
) -> Result<HttpResponse, PostError> {
5356
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
5457
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
5558
};
@@ -363,7 +366,7 @@ pub enum PostError {
363366
#[error("Missing field for time partition in json: {0}")]
364367
MissingTimePartition(String),
365368
#[error("{0}")]
366-
KnownFormat(#[from] Unacceptable),
369+
KnownFormat(#[from] known_schema::Error),
367370
}
368371

369372
impl actix_web::ResponseError for PostError {

0 commit comments

Comments
 (0)
Please sign in to comment.