Skip to content

Commit 8d3c740

Browse files
committed
updates for enterprise
1 parent 4af9a5e commit 8d3c740

File tree

15 files changed

+454
-117
lines changed

15 files changed

+454
-117
lines changed

src/cli.rs

+44-18
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ pub struct Options {
294294
)]
295295
pub ingestor_endpoint: String,
296296

297+
#[arg(
298+
long,
299+
env = "P_IDEXER_ENDPOINT",
300+
default_value = "",
301+
help = "URL to connect to this specific indexer. Default is the address of the server"
302+
)]
303+
pub indexer_endpoint: String,
304+
297305
#[command(flatten)]
298306
pub oidc: Option<OidcConfig>,
299307

@@ -395,29 +403,47 @@ impl Options {
395403
}
396404

397405
/// TODO: refactor and document
398-
pub fn get_url(&self) -> Url {
399-
if self.ingestor_endpoint.is_empty() {
400-
return format!(
401-
"{}://{}",
402-
self.get_scheme(),
403-
self.address
404-
)
405-
.parse::<Url>() // if the value was improperly set, this will panic before hand
406-
.unwrap_or_else(|err| {
407-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
408-
});
409-
}
410-
411-
let ingestor_endpoint = &self.ingestor_endpoint;
406+
pub fn get_url(&self, mode: Mode) -> Url {
407+
let (endpoint, env_var) = match mode {
408+
Mode::Ingest => {
409+
if self.ingestor_endpoint.is_empty() {
410+
return format!(
411+
"{}://{}",
412+
self.get_scheme(),
413+
self.address
414+
)
415+
.parse::<Url>() // if the value was improperly set, this will panic before hand
416+
.unwrap_or_else(|err| {
417+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
418+
});
419+
}
420+
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
421+
}
422+
Mode::Index => {
423+
if self.indexer_endpoint.is_empty() {
424+
return format!(
425+
"{}://{}",
426+
self.get_scheme(),
427+
self.address
428+
)
429+
.parse::<Url>() // if the value was improperly set, this will panic before hand
430+
.unwrap_or_else(|err| {
431+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
432+
});
433+
}
434+
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
435+
}
436+
_ => panic!("Invalid mode"),
437+
};
412438

413-
if ingestor_endpoint.starts_with("http") {
414-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
439+
if endpoint.starts_with("http") {
440+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
415441
}
416442

417-
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
443+
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
418444

419445
if addr_from_env.len() != 2 {
420-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
446+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
421447
}
422448

423449
let mut hostname = addr_from_env[0].to_string();

src/enterprise/utils.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, path::PathBuf, sync::Arc};
22

3+
use chrono::{TimeZone, Utc};
34
use datafusion::{common::Column, prelude::Expr};
45
use itertools::Itertools;
56
use relative_path::RelativePathBuf;
@@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths(
119120

120121
selected_files
121122
.into_iter()
122-
.map(|file| {
123+
.filter_map(|file| {
123124
let date = file.file_path.split("/").collect_vec();
124125

125-
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126-
127-
let date = RelativePathBuf::from_iter(date);
128-
129-
parquet_files.entry(date).or_default().push(file);
126+
let year = &date[1][5..9];
127+
let month = &date[1][10..12];
128+
let day = &date[1][13..15];
129+
let hour = &date[2][5..7];
130+
let min = &date[3][7..9];
131+
let file_date = Utc
132+
.with_ymd_and_hms(
133+
year.parse::<i32>().unwrap(),
134+
month.parse::<u32>().unwrap(),
135+
day.parse::<u32>().unwrap(),
136+
hour.parse::<u32>().unwrap(),
137+
min.parse::<u32>().unwrap(),
138+
0,
139+
)
140+
.unwrap();
141+
142+
if file_date < time_range.start {
143+
None
144+
} else {
145+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
146+
147+
let date = RelativePathBuf::from_iter(date);
148+
149+
parquet_files.entry(date).or_default().push(file);
150+
Some("")
151+
}
130152
})
131153
.for_each(|_| {});
132154

src/handlers/http/cluster/mod.rs

+22-1
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ use crate::HTTP_CLIENT;
5151
use super::base_path_without_preceding_slash;
5252
use super::ingest::PostError;
5353
use super::logstream::error::StreamError;
54-
use super::modal::IngestorMetadata;
54+
use super::modal::{IndexerMetadata, IngestorMetadata};
5555
use super::rbac::RBACError;
5656
use super::role::RoleError;
5757

5858
type IngestorMetadataArr = Vec<IngestorMetadata>;
5959

60+
type IndexerMetadataArr = Vec<IndexerMetadata>;
61+
6062
pub const INTERNAL_STREAM_NAME: &str = "pmeta";
6163

6264
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
@@ -635,6 +637,25 @@ pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
635637
Ok(arr)
636638
}
637639

640+
// update the .query.json file and return the new IndexerMetadataArr
641+
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
642+
let store = PARSEABLE.storage.get_object_store();
643+
644+
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
645+
let arr = store
646+
.get_objects(
647+
Some(&root_path),
648+
Box::new(|file_name| file_name.starts_with("indexer")),
649+
)
650+
.await?
651+
.iter()
652+
// this unwrap will most definateley shoot me in the foot later
653+
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
654+
.collect_vec();
655+
656+
Ok(arr)
657+
}
658+
638659
pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
639660
let domain_name = to_url_string(ingestor.into_inner());
640661

src/handlers/http/middleware.rs

+5-15
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,12 @@ where
359359
}
360360

361361
Mode::Index => {
362-
let accessable_endpoints = ["create", "delete"];
363-
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364-
if !cond {
365-
Box::pin(async {
366-
Err(actix_web::error::ErrorUnauthorized(
367-
"Only Index API can be accessed in Index Mode",
368-
))
369-
})
370-
} else {
371-
let fut = self.service.call(req);
362+
let fut = self.service.call(req);
372363

373-
Box::pin(async move {
374-
let res = fut.await?;
375-
Ok(res)
376-
})
377-
}
364+
Box::pin(async move {
365+
let res = fut.await?;
366+
Ok(res)
367+
})
378368
}
379369
}
380370
}

src/handlers/http/modal/ingest_server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use relative_path::RelativePathBuf;
2828
use serde_json::Value;
2929
use tokio::sync::oneshot;
3030

31+
use crate::option::Mode;
3132
use crate::{
3233
analytics,
3334
handlers::{
@@ -108,7 +109,7 @@ impl ParseableServer for IngestServer {
108109
tokio::spawn(airplane::server());
109110

110111
// write the ingestor metadata to storage
111-
PARSEABLE.store_ingestor_metadata().await?;
112+
PARSEABLE.store_metadata(Mode::Ingest).await?;
112113

113114
// Ingestors shouldn't have to deal with OpenId auth flow
114115
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
@@ -251,7 +252,7 @@ impl IngestServer {
251252

252253
// check for querier state. Is it there, or was it there in the past
253254
// this should happen before the set the ingestor metadata
254-
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255+
pub async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255256
// how do we check for querier state?
256257
// based on the work flow of the system, the querier will always need to start first
257258
// i.e the querier will create the `.parseable.json` file

0 commit comments

Comments
 (0)