Skip to content

Commit 571c528

Browse files
Merge branch 'main' into schema-detect
2 parents 382897d + a114e14 commit 571c528

File tree

5 files changed

+118
-22
lines changed

5 files changed

+118
-22
lines changed

src/event/format/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,25 @@ type EventSchema = Vec<Arc<Field>>;
6060
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
6161
pub enum LogSource {
6262
// AWS Kinesis sends logs in the format of a json array
63+
#[serde(rename = "kinesis")]
6364
Kinesis,
6465
// OpenTelemetry sends logs according to the specification as explained here
6566
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
67+
#[serde(rename = "otel-logs")]
6668
OtelLogs,
6769
// OpenTelemetry sends traces according to the specification as explained here
6870
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
71+
#[serde(rename = "otel-metrics")]
6972
OtelMetrics,
7073
// OpenTelemetry sends traces according to the specification as explained here
7174
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
75+
#[serde(rename = "otel-traces")]
7276
OtelTraces,
7377
// Internal Stream format
78+
#[serde(rename = "pmeta")]
7479
Pmeta,
7580
#[default]
81+
#[serde(rename = "json")]
7682
// Json object or array
7783
Json,
7884
// Custom Log Sources e.g. "syslog"

src/handlers/http/query.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::query::error::ExecuteError;
4444
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
4545
use crate::query::{TableScanVisitor, QUERY_SESSION};
4646
use crate::rbac::Users;
47-
use crate::response::QueryResponse;
47+
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
4848
use crate::storage::object_storage::commit_schema_to_storage;
4949
use crate::storage::ObjectStorageError;
5050
use crate::utils::actix::extract_session_key_from_req;
@@ -122,22 +122,26 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
122122
Value::Array(vec![json!({column_name: count})])
123123
};
124124

125+
let total_time = format!("{:?}", time.elapsed());
125126
let time = time.elapsed().as_secs_f64();
126127

127128
QUERY_EXECUTE_TIME
128129
.with_label_values(&[&table_name])
129130
.observe(time);
130131

131-
return Ok(HttpResponse::Ok().json(response));
132+
return Ok(HttpResponse::Ok()
133+
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
134+
.json(response));
132135
}
133136

134137
let (records, fields) = execute(query, &table_name).await?;
135-
138+
let total_time = format!("{:?}", time.elapsed());
136139
let response = QueryResponse {
137140
records,
138141
fields,
139142
fill_null: query_request.send_null,
140143
with_fields: query_request.fields,
144+
total_time,
141145
}
142146
.to_http()?;
143147

src/migration/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ async fn migrate_stream_metadata(
246246
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
247247
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
248248
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
249+
249250
storage
250251
.put_object(&path, to_bytes(&stream_metadata_value))
251252
.await?;
@@ -259,6 +260,7 @@ async fn migrate_stream_metadata(
259260
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
260261
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
261262
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
263+
262264
storage
263265
.put_object(&path, to_bytes(&stream_metadata_value))
264266
.await?;
@@ -272,13 +274,15 @@ async fn migrate_stream_metadata(
272274
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
273275
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
274276
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
277+
275278
storage
276279
.put_object(&path, to_bytes(&stream_metadata_value))
277280
.await?;
278281
}
279282
Some("v4") => {
280283
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
281284
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
285+
282286
storage
283287
.put_object(&path, to_bytes(&stream_metadata_value))
284288
.await?;
@@ -289,7 +293,13 @@ async fn migrate_stream_metadata(
289293
.put_object(&path, to_bytes(&stream_metadata_value))
290294
.await?;
291295
}
292-
_ => (),
296+
_ => {
297+
stream_metadata_value =
298+
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
299+
storage
300+
.put_object(&path, to_bytes(&stream_metadata_value))
301+
.await?;
302+
}
293303
}
294304

295305
Ok(stream_metadata_value)

0 commit comments

Comments
 (0)