Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: serde StaticSchema #1126

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
*
*/

pub mod utils;

use std::collections::HashSet;
use std::time::Duration;

use actix_web::http::header::{self, HeaderMap};
use actix_web::web::Path;
use actix_web::Responder;
use bytes::Bytes;
use chrono::Utc;
use clokwerk::{AsyncScheduler, Interval};
use http::{header as http_header, StatusCode};
Expand All @@ -42,6 +39,7 @@ use crate::metrics::prom_utils::Metrics;
use crate::parseable::PARSEABLE;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::static_schema::StaticSchema;
use crate::stats::Stats;
use crate::storage::{
ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY,
Expand All @@ -55,6 +53,8 @@ use super::modal::IngestorMetadata;
use super::rbac::RBACError;
use super::role::RoleError;

pub mod utils;

type IngestorMetadataArr = Vec<IngestorMetadata>;

pub const INTERNAL_STREAM_NAME: &str = "pmeta";
Expand All @@ -64,7 +64,7 @@ const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1
// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
static_schema: Option<StaticSchema>,
stream_name: &str,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();
Expand All @@ -88,21 +88,23 @@ pub async fn sync_streams_with_ingestors(
base_path_without_preceding_slash(),
stream_name
);
let res = HTTP_CLIENT
let mut req = HTTP_CLIENT
.put(url)
.headers(reqwest_headers.clone())
.header(header::AUTHORIZATION, &ingestor.token)
.body(body.clone())
.send()
.await
.map_err(|err| {
error!(
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
ingestor.domain_name, err
);
StreamError::Network(err)
})?;
.header(header::AUTHORIZATION, &ingestor.token);

if let Some(schema) = static_schema.as_ref() {
req = req.json(schema);
}

let res = req.send().await.inspect_err(|err| {
error!(
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
ingestor.domain_name, err
);
})?;

// TODO: review the following code
if !res.status().is_success() {
error!(
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
Expand Down
38 changes: 22 additions & 16 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,35 @@
*
*/

use self::error::StreamError;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;
use std::fs;
use std::sync::Arc;

use actix_web::http::StatusCode;
use actix_web::web::{Json, Path};
use actix_web::{web, HttpRequest, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use chrono::Utc;
use error::StreamError;
use itertools::Itertools;
use serde_json::{json, Value};
use tracing::warn;

use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::static_schema::StaticSchema;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::retention::Retention;
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::{stats, validator, LOCK_EXPECT};

use actix_web::http::StatusCode;
use actix_web::web::{Json, Path};
use actix_web::{web, HttpRequest, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use bytes::Bytes;
use chrono::Utc;
use itertools::Itertools;
use serde_json::{json, Value};
use std::fs;
use std::sync::Arc;
use tracing::warn;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
Expand Down Expand Up @@ -144,12 +146,16 @@ pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, Str
pub async fn put_stream(
req: HttpRequest,
stream_name: Path<String>,
body: Bytes,
static_schema: Option<Json<StaticSchema>>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name)
.create_update_stream(
req.headers(),
static_schema.as_ref().map(|Json(schema)| schema),
&stream_name,
)
.await?;

Ok(("Log stream created", StatusCode::OK))
Expand Down
10 changes: 7 additions & 3 deletions src/handlers/http/modal/ingest/ingestor_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use actix_web::{
web::{Json, Path},
HttpRequest, Responder,
};
use bytes::Bytes;
use http::StatusCode;
use tracing::warn;

use crate::{
catalog::remove_manifest_from_snapshot,
handlers::http::logstream::error::StreamError,
parseable::{StreamNotFound, PARSEABLE},
static_schema::StaticSchema,
stats,
};

Expand Down Expand Up @@ -80,11 +80,15 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
pub async fn put_stream(
req: HttpRequest,
stream_name: Path<String>,
body: Bytes,
static_schema: Option<Json<StaticSchema>>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name)
.create_update_stream(
req.headers(),
static_schema.as_ref().map(|Json(s)| s),
&stream_name,
)
.await?;

Ok(("Log stream created", StatusCode::OK))
Expand Down
15 changes: 8 additions & 7 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use core::str;
use std::fs;

use actix_web::{
web::{self, Path},
web::{Json, Path},
HttpRequest, Responder,
};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use relative_path::RelativePathBuf;
Expand All @@ -44,6 +43,7 @@ use crate::{
},
hottier::HotTierManager,
parseable::{StreamNotFound, PARSEABLE},
static_schema::StaticSchema,
stats::{self, Stats},
storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
};
Expand Down Expand Up @@ -109,15 +109,16 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
pub async fn put_stream(
req: HttpRequest,
stream_name: Path<String>,
body: Bytes,
static_schema: Option<Json<StaticSchema>>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
let _ = CREATE_STREAM_LOCK.lock().await;
let static_schema = static_schema.map(|Json(s)| s);
let headers = PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name)
.create_update_stream(req.headers(), static_schema.as_ref(), &stream_name)
.await?;

sync_streams_with_ingestors(headers, body, &stream_name).await?;
sync_streams_with_ingestors(headers, static_schema, &stream_name).await?;

Ok(("Log stream created", StatusCode::OK))
}
Expand Down Expand Up @@ -188,7 +189,7 @@ pub async fn get_stats(
};
let stats = serde_json::to_value(total_stats)?;

return Ok((web::Json(stats), StatusCode::OK));
return Ok((Json(stats), StatusCode::OK));
}
}

Expand Down Expand Up @@ -235,5 +236,5 @@ pub async fn get_stats(

let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
Ok((Json(stats), StatusCode::OK))
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
ClientBuilder::new()
.connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
.timeout(Duration::from_secs(10)) // set a timeout of 10s for each request
.timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
.pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
.pool_max_idle_per_host(32) // max 32 idle connections per host
.gzip(true) // gzip compress for all requests
Expand Down
39 changes: 19 additions & 20 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl Parseable {
HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
);
header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
sync_streams_with_ingestors(header_map, None, INTERNAL_STREAM_NAME).await?;

Ok(())
}
Expand Down Expand Up @@ -409,7 +409,7 @@ impl Parseable {
pub async fn create_update_stream(
&self,
headers: &HeaderMap,
body: &Bytes,
static_schema: Option<&StaticSchema>,
stream_name: &str,
) -> Result<HeaderMap, StreamError> {
let PutStreamHeaders {
Expand Down Expand Up @@ -469,7 +469,7 @@ impl Parseable {
}

let schema = validate_static_schema(
body,
static_schema,
stream_name,
&time_partition,
custom_partition.as_ref(),
Expand Down Expand Up @@ -768,7 +768,7 @@ impl Parseable {
}

pub fn validate_static_schema(
body: &Bytes,
static_schema: Option<&StaticSchema>,
stream_name: &str,
time_partition: &str,
custom_partition: Option<&String>,
Expand All @@ -778,22 +778,21 @@ pub fn validate_static_schema(
return Ok(Arc::new(Schema::empty()));
}

if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;
let static_schema = static_schema.ok_or_else(|| CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
})?;
let parsed_schema = convert_static_schema_to_arrow_schema(
static_schema.clone(),
time_partition,
custom_partition,
)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;

Ok(parsed_schema)
}
Expand Down
Loading