Skip to content

Commit 526907c

Browse files
chore: updates for enterprise (#1247)
Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 0d33635 commit 526907c

File tree

16 files changed

+515
-124
lines changed

16 files changed

+515
-124
lines changed

src/cli.rs

+44-18
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ pub struct Options {
303303
)]
304304
pub ingestor_endpoint: String,
305305

306+
#[arg(
307+
long,
308+
env = "P_INDEXER_ENDPOINT",
309+
default_value = "",
310+
help = "URL to connect to this specific indexer. Default is the address of the server"
311+
)]
312+
pub indexer_endpoint: String,
313+
306314
#[command(flatten)]
307315
pub oidc: Option<OidcConfig>,
308316

@@ -404,29 +412,47 @@ impl Options {
404412
}
405413

406414
/// TODO: refactor and document
407-
pub fn get_url(&self) -> Url {
408-
if self.ingestor_endpoint.is_empty() {
409-
return format!(
410-
"{}://{}",
411-
self.get_scheme(),
412-
self.address
413-
)
414-
.parse::<Url>() // if the value was improperly set, this will panic before hand
415-
.unwrap_or_else(|err| {
416-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
417-
});
418-
}
419-
420-
let ingestor_endpoint = &self.ingestor_endpoint;
415+
pub fn get_url(&self, mode: Mode) -> Url {
416+
let (endpoint, env_var) = match mode {
417+
Mode::Ingest => {
418+
if self.ingestor_endpoint.is_empty() {
419+
return format!(
420+
"{}://{}",
421+
self.get_scheme(),
422+
self.address
423+
)
424+
.parse::<Url>() // if the value was improperly set, this will panic before hand
425+
.unwrap_or_else(|err| {
426+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
427+
});
428+
}
429+
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
430+
}
431+
Mode::Index => {
432+
if self.indexer_endpoint.is_empty() {
433+
return format!(
434+
"{}://{}",
435+
self.get_scheme(),
436+
self.address
437+
)
438+
.parse::<Url>() // if the value was improperly set, this will panic before hand
439+
.unwrap_or_else(|err| {
440+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
441+
});
442+
}
443+
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
444+
}
445+
_ => panic!("Invalid mode"),
446+
};
421447

422-
if ingestor_endpoint.starts_with("http") {
423-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
448+
if endpoint.starts_with("http") {
449+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
424450
}
425451

426-
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
452+
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
427453

428454
if addr_from_env.len() != 2 {
429-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
455+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
430456
}
431457

432458
let mut hostname = addr_from_env[0].to_string();

src/enterprise/utils.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, path::PathBuf, sync::Arc};
22

3+
use chrono::{TimeZone, Utc};
34
use datafusion::{common::Column, prelude::Expr};
45
use itertools::Itertools;
56
use relative_path::RelativePathBuf;
@@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths(
119120

120121
selected_files
121122
.into_iter()
122-
.map(|file| {
123+
.filter_map(|file| {
123124
let date = file.file_path.split("/").collect_vec();
124125

125-
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126-
127-
let date = RelativePathBuf::from_iter(date);
128-
129-
parquet_files.entry(date).or_default().push(file);
126+
let year = &date[1][5..9];
127+
let month = &date[1][10..12];
128+
let day = &date[1][13..15];
129+
let hour = &date[2][5..7];
130+
let min = &date[3][7..9];
131+
let file_date = Utc
132+
.with_ymd_and_hms(
133+
year.parse::<i32>().unwrap(),
134+
month.parse::<u32>().unwrap(),
135+
day.parse::<u32>().unwrap(),
136+
hour.parse::<u32>().unwrap(),
137+
min.parse::<u32>().unwrap(),
138+
0,
139+
)
140+
.unwrap();
141+
142+
if file_date < time_range.start {
143+
None
144+
} else {
145+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
146+
147+
let date = RelativePathBuf::from_iter(date);
148+
149+
parquet_files.entry(date).or_default().push(file);
150+
Some("")
151+
}
130152
})
131153
.for_each(|_| {});
132154

src/handlers/http/cluster/mod.rs

+21-2
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ use crate::HTTP_CLIENT;
5151
use super::base_path_without_preceding_slash;
5252
use super::ingest::PostError;
5353
use super::logstream::error::StreamError;
54-
use super::modal::IngestorMetadata;
54+
use super::modal::{IndexerMetadata, IngestorMetadata};
5555
use super::rbac::RBACError;
5656
use super::role::RoleError;
5757

5858
type IngestorMetadataArr = Vec<IngestorMetadata>;
5959

60+
type IndexerMetadataArr = Vec<IndexerMetadata>;
61+
6062
pub const INTERNAL_STREAM_NAME: &str = "pmeta";
6163

6264
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
@@ -616,7 +618,6 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
616618
Ok(actix_web::HttpResponse::Ok().json(dresses))
617619
}
618620

619-
// update the .query.json file and return the new ingestorMetadataArr
620621
pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
621622
let store = PARSEABLE.storage.get_object_store();
622623

@@ -635,6 +636,24 @@ pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
635636
Ok(arr)
636637
}
637638

639+
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
640+
let store = PARSEABLE.storage.get_object_store();
641+
642+
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
643+
let arr = store
644+
.get_objects(
645+
Some(&root_path),
646+
Box::new(|file_name| file_name.starts_with("indexer")),
647+
)
648+
.await?
649+
.iter()
650+
// this unwrap will most definateley shoot me in the foot later
651+
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
652+
.collect_vec();
653+
654+
Ok(arr)
655+
}
656+
638657
pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
639658
let domain_name = to_url_string(ingestor.into_inner());
640659

src/handlers/http/middleware.rs

+5-15
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,12 @@ where
359359
}
360360

361361
Mode::Index => {
362-
let accessable_endpoints = ["create", "delete"];
363-
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364-
if !cond {
365-
Box::pin(async {
366-
Err(actix_web::error::ErrorUnauthorized(
367-
"Only Index API can be accessed in Index Mode",
368-
))
369-
})
370-
} else {
371-
let fut = self.service.call(req);
362+
let fut = self.service.call(req);
372363

373-
Box::pin(async move {
374-
let res = fut.await?;
375-
Ok(res)
376-
})
377-
}
364+
Box::pin(async move {
365+
let res = fut.await?;
366+
Ok(res)
367+
})
378368
}
379369
}
380370
}

src/handlers/http/modal/ingest_server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use relative_path::RelativePathBuf;
2828
use serde_json::Value;
2929
use tokio::sync::oneshot;
3030

31+
use crate::option::Mode;
3132
use crate::{
3233
analytics,
3334
handlers::{
@@ -108,7 +109,7 @@ impl ParseableServer for IngestServer {
108109
tokio::spawn(airplane::server());
109110

110111
// write the ingestor metadata to storage
111-
PARSEABLE.store_ingestor_metadata().await?;
112+
PARSEABLE.store_metadata(Mode::Ingest).await?;
112113

113114
// Ingestors shouldn't have to deal with OpenId auth flow
114115
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
@@ -251,7 +252,7 @@ impl IngestServer {
251252

252253
// check for querier state. Is it there, or was it there in the past
253254
// this should happen before the set the ingestor metadata
254-
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255+
pub async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255256
// how do we check for querier state?
256257
// based on the work flow of the system, the querier will always need to start first
257258
// i.e the querier will create the `.parseable.json` file

0 commit comments

Comments
 (0)