diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 3232d54cb..8648cfdf0 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -16,15 +16,12 @@ * */ -pub mod utils; - use std::collections::HashSet; use std::time::Duration; use actix_web::http::header::{self, HeaderMap}; use actix_web::web::Path; use actix_web::Responder; -use bytes::Bytes; use chrono::Utc; use clokwerk::{AsyncScheduler, Interval}; use http::{header as http_header, StatusCode}; @@ -42,6 +39,7 @@ use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; +use crate::static_schema::StaticSchema; use crate::stats::Stats; use crate::storage::{ ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, @@ -55,6 +53,8 @@ use super::modal::IngestorMetadata; use super::rbac::RBACError; use super::role::RoleError; +pub mod utils; + type IngestorMetadataArr = Vec; pub const INTERNAL_STREAM_NAME: &str = "pmeta"; @@ -64,7 +64,7 @@ const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1 // forward the create/update stream request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( headers: HeaderMap, - body: Bytes, + static_schema: Option, stream_name: &str, ) -> Result<(), StreamError> { let mut reqwest_headers = http_header::HeaderMap::new(); @@ -88,21 +88,23 @@ pub async fn sync_streams_with_ingestors( base_path_without_preceding_slash(), stream_name ); - let res = HTTP_CLIENT + let mut req = HTTP_CLIENT .put(url) .headers(reqwest_headers.clone()) - .header(header::AUTHORIZATION, &ingestor.token) - .body(body.clone()) - .send() - .await - .map_err(|err| { - error!( - "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, err - ); - StreamError::Network(err) - })?; + .header(header::AUTHORIZATION, &ingestor.token); + + if let Some(schema) = static_schema.as_ref() { + req = req.json(schema); + } + + let res = req.send().await.inspect_err(|err| { + error!( + "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + })?; + // TODO: review the following code if !res.status().is_success() { error!( "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index e6b9dff10..97680017c 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -16,9 +16,19 @@ * */ -use self::error::StreamError; -use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; -use super::query::update_schema_when_distributed; +use std::fs; +use std::sync::Arc; + +use actix_web::http::StatusCode; +use actix_web::web::{Json, Path}; +use actix_web::{web, HttpRequest, Responder}; +use arrow_json::reader::infer_json_schema_from_iterator; +use chrono::Utc; +use error::StreamError; +use itertools::Itertools; +use serde_json::{json, Value}; +use tracing::warn; + use crate::event::format::override_data_type; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::SchemaVersion; @@ -26,23 +36,15 @@ use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STO use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::role::Action; use crate::rbac::Users; +use crate::static_schema::StaticSchema; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; use crate::{stats, validator, LOCK_EXPECT}; -use actix_web::http::StatusCode; -use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, Responder}; -use arrow_json::reader::infer_json_schema_from_iterator; -use bytes::Bytes; -use chrono::Utc; -use itertools::Itertools; -use serde_json::{json, Value}; -use std::fs; -use std::sync::Arc; -use tracing::warn; +use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; +use super::query::update_schema_when_distributed; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -144,12 +146,16 @@ pub async fn get_schema(stream_name: Path) -> Result, - body: Bytes, + static_schema: Option>, ) -> Result { let stream_name = stream_name.into_inner(); PARSEABLE - .create_update_stream(req.headers(), &body, &stream_name) + .create_update_stream( + req.headers(), + static_schema.as_ref().map(|Json(schema)| schema), + &stream_name, + ) .await?; Ok(("Log stream created", StatusCode::OK)) diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 4872a6476..42b951f1a 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -20,7 +20,6 @@ use actix_web::{ web::{Json, Path}, HttpRequest, Responder, }; -use bytes::Bytes; use http::StatusCode; use tracing::warn; @@ -28,6 +27,7 @@ use crate::{ catalog::remove_manifest_from_snapshot, handlers::http::logstream::error::StreamError, parseable::{StreamNotFound, PARSEABLE}, + static_schema::StaticSchema, stats, }; @@ -80,11 +80,15 @@ pub async fn delete(stream_name: Path) -> Result, - body: Bytes, + static_schema: Option>, ) -> Result { let stream_name = stream_name.into_inner(); PARSEABLE - .create_update_stream(req.headers(), &body, &stream_name) + .create_update_stream( + req.headers(), + static_schema.as_ref().map(|Json(s)| s), + &stream_name, + ) .await?; Ok(("Log stream created", StatusCode::OK)) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 76d282f76..14ce8fa45 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -20,10 +20,9 @@ use core::str; use std::fs; use actix_web::{ - web::{self, Path}, + web::{Json, Path}, HttpRequest, Responder, }; -use bytes::Bytes; use chrono::Utc; use http::StatusCode; use relative_path::RelativePathBuf; @@ -44,6 +43,7 @@ use crate::{ }, hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, + static_schema::StaticSchema, stats::{self, Stats}, storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, }; @@ -109,15 +109,16 @@ pub async fn delete(stream_name: Path) -> Result, - body: Bytes, + static_schema: Option>, ) -> Result { let stream_name = stream_name.into_inner(); let _ = CREATE_STREAM_LOCK.lock().await; + let static_schema = static_schema.map(|Json(s)| s); let headers = PARSEABLE - .create_update_stream(req.headers(), &body, &stream_name) + .create_update_stream(req.headers(), static_schema.as_ref(), &stream_name) .await?; - sync_streams_with_ingestors(headers, body, &stream_name).await?; + sync_streams_with_ingestors(headers, static_schema, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } @@ -188,7 +189,7 @@ pub async fn get_stats( }; let stats = serde_json::to_value(total_stats)?; - return Ok((web::Json(stats), StatusCode::OK)); + return Ok((Json(stats), StatusCode::OK)); } } @@ -235,5 +236,5 @@ pub async fn get_stats( let stats = serde_json::to_value(stats)?; - Ok((web::Json(stats), StatusCode::OK)) + Ok((Json(stats), StatusCode::OK)) } diff --git a/src/lib.rs b/src/lib.rs index 94b81639d..ebe967260 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,7 @@ pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); static HTTP_CLIENT: Lazy = Lazy::new(|| { ClientBuilder::new() .connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup - .timeout(Duration::from_secs(10)) // set a timeout of 10s for each request + .timeout(Duration::from_secs(30)) // set a timeout of 30s for each request .pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection .pool_max_idle_per_host(32) // max 32 idle connections per host .gzip(true) // gzip compress for all requests diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 6e4f55e94..a24f510ff 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -364,7 +364,7 @@ impl Parseable { HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(), ); header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; + sync_streams_with_ingestors(header_map, None, INTERNAL_STREAM_NAME).await?; Ok(()) } @@ -409,7 +409,7 @@ impl Parseable { pub async fn create_update_stream( &self, headers: &HeaderMap, - body: &Bytes, + static_schema: Option<&StaticSchema>, stream_name: &str, ) -> Result { let PutStreamHeaders { @@ -469,7 +469,7 @@ impl Parseable { } let schema = validate_static_schema( - body, + static_schema, stream_name, &time_partition, custom_partition.as_ref(), @@ -768,7 +768,7 @@ impl Parseable { } pub fn validate_static_schema( - body: &Bytes, + static_schema: Option<&StaticSchema>, stream_name: &str, time_partition: &str, custom_partition: Option<&String>, @@ -778,22 +778,21 @@ pub fn validate_static_schema( return Ok(Arc::new(Schema::empty())); } - if body.is_empty() { - return Err(CreateStreamError::Custom { - msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - let static_schema: StaticSchema = serde_json::from_slice(body)?; - let parsed_schema = - convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) - .map_err(|_| CreateStreamError::Custom { - msg: format!("Unable to commit static schema, logstream {stream_name} not created"), - status: StatusCode::BAD_REQUEST, - })?; + let static_schema = static_schema.ok_or_else(|| CreateStreamError::Custom { + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + })?; + let parsed_schema = convert_static_schema_to_arrow_schema( + static_schema.clone(), + time_partition, + custom_partition, + ) + .map_err(|_| CreateStreamError::Custom { + msg: format!("Unable to commit static schema, logstream {stream_name} not created"), + status: StatusCode::BAD_REQUEST, + })?; Ok(parsed_schema) }