Skip to content

Commit f8c0459

Browse files
authored
Merge branch 'main' into to_json
2 parents 00d790a + 523ecc7 commit f8c0459

File tree

14 files changed

+245
-280
lines changed

14 files changed

+245
-280
lines changed

src/banner.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
104104
Staging Path: \"{}\"",
105105
"Storage:".to_string().bold(),
106106
config.get_storage_mode_string(),
107-
config.staging_dir().to_string_lossy(),
107+
config.options.staging_dir().to_string_lossy(),
108108
);
109109

110110
if let Some(path) = &config.options.hot_tier_storage_path {

src/cli.rs

+71-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use clap::Parser;
20-
use std::path::PathBuf;
20+
use std::{env, fs, path::PathBuf};
2121

2222
use url::Url;
2323

@@ -385,4 +385,74 @@ impl Options {
385385
pub fn is_default_creds(&self) -> bool {
386386
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
387387
}
388+
389+
/// Path to staging directory, ensures that it exists or panics
390+
pub fn staging_dir(&self) -> &PathBuf {
391+
fs::create_dir_all(&self.local_staging_path)
392+
.expect("Should be able to create dir if doesn't exist");
393+
394+
&self.local_staging_path
395+
}
396+
397+
/// TODO: refactor and document
398+
pub fn get_url(&self) -> Url {
399+
if self.ingestor_endpoint.is_empty() {
400+
return format!(
401+
"{}://{}",
402+
self.get_scheme(),
403+
self.address
404+
)
405+
.parse::<Url>() // if the value was improperly set, this will panic before hand
406+
.unwrap_or_else(|err| {
407+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
408+
});
409+
}
410+
411+
let ingestor_endpoint = &self.ingestor_endpoint;
412+
413+
if ingestor_endpoint.starts_with("http") {
414+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
415+
}
416+
417+
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
418+
419+
if addr_from_env.len() != 2 {
420+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
421+
}
422+
423+
let mut hostname = addr_from_env[0].to_string();
424+
let mut port = addr_from_env[1].to_string();
425+
426+
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
427+
// fetch the value from the specified env vars
428+
if hostname.starts_with('$') {
429+
let var_hostname = hostname[1..].to_string();
430+
hostname = env::var(&var_hostname).unwrap_or_default();
431+
432+
if hostname.is_empty() {
433+
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
434+
}
435+
if hostname.starts_with("http") {
436+
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
437+
} else {
438+
hostname = format!("{}://{}", self.get_scheme(), hostname);
439+
}
440+
}
441+
442+
if port.starts_with('$') {
443+
let var_port = port[1..].to_string();
444+
port = env::var(&var_port).unwrap_or_default();
445+
446+
if port.is_empty() {
447+
panic!(
448+
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
449+
var_port
450+
);
451+
}
452+
}
453+
454+
format!("{}://{}:{}", self.get_scheme(), hostname, port)
455+
.parse::<Url>()
456+
.expect("Valid URL")
457+
}
388458
}

src/handlers/http/about.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
6666
let staging = if PARSEABLE.options.mode == Mode::Query {
6767
"".to_string()
6868
} else {
69-
PARSEABLE.staging_dir().display().to_string()
69+
PARSEABLE.options.staging_dir().display().to_string()
7070
};
7171
let grpc_port = PARSEABLE.options.grpc_port;
7272

src/handlers/http/logstream.rs

+30-90
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::event::format::override_data_type;
2323
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2424
use crate::metadata::SchemaVersion;
2525
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
26-
use crate::option::Mode;
2726
use crate::parseable::{StreamNotFound, PARSEABLE};
2827
use crate::rbac::role::Action;
2928
use crate::rbac::Users;
@@ -47,7 +46,8 @@ use tracing::warn;
4746

4847
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
4948
let stream_name = stream_name.into_inner();
50-
if !PARSEABLE.streams.contains(&stream_name) {
49+
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
50+
if PARSEABLE.check_or_load_stream(&stream_name).await {
5151
return Err(StreamNotFound(stream_name).into());
5252
}
5353

@@ -120,15 +120,11 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
120120
Ok((web::Json(schema), StatusCode::OK))
121121
}
122122

123-
pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
123+
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
124124
let stream_name = stream_name.into_inner();
125125

126126
// Ensure parseable is aware of stream in distributed mode
127-
if PARSEABLE.options.mode == Mode::Query
128-
&& PARSEABLE
129-
.create_stream_and_schema_from_storage(&stream_name)
130-
.await?
131-
{
127+
if PARSEABLE.check_or_load_stream(&stream_name).await {
132128
return Err(StreamNotFound(stream_name.clone()).into());
133129
}
134130

@@ -164,14 +160,8 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
164160
// For query mode, if the stream not found in memory map,
165161
//check if it exists in the storage
166162
//create stream and schema from storage
167-
if PARSEABLE.options.mode == Mode::Query {
168-
match PARSEABLE
169-
.create_stream_and_schema_from_storage(&stream_name)
170-
.await
171-
{
172-
Ok(true) => {}
173-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
174-
}
163+
if PARSEABLE.check_or_load_stream(&stream_name).await {
164+
return Err(StreamNotFound(stream_name.clone()).into());
175165
}
176166

177167
let retention = PARSEABLE
@@ -183,36 +173,24 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
183173

184174
pub async fn put_retention(
185175
stream_name: Path<String>,
186-
Json(json): Json<Value>,
176+
Json(retention): Json<Retention>,
187177
) -> Result<impl Responder, StreamError> {
188178
let stream_name = stream_name.into_inner();
189179

190180
// For query mode, if the stream not found in memory map,
191181
//check if it exists in the storage
192182
//create stream and schema from storage
193-
if PARSEABLE.options.mode == Mode::Query {
194-
match PARSEABLE
195-
.create_stream_and_schema_from_storage(&stream_name)
196-
.await
197-
{
198-
Ok(true) => {}
199-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
200-
}
183+
if PARSEABLE.check_or_load_stream(&stream_name).await {
184+
return Err(StreamNotFound(stream_name).into());
201185
}
202-
let stream = PARSEABLE.get_stream(&stream_name)?;
203-
204-
let retention: Retention = match serde_json::from_value(json) {
205-
Ok(retention) => retention,
206-
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
207-
};
208186

209187
PARSEABLE
210188
.storage
211189
.get_object_store()
212190
.put_retention(&stream_name, &retention)
213191
.await?;
214192

215-
stream.set_retention(retention);
193+
PARSEABLE.get_stream(&stream_name)?.set_retention(retention);
216194

217195
Ok((
218196
format!("set retention configuration for log stream {stream_name}"),
@@ -250,21 +228,11 @@ pub async fn get_stats(
250228
) -> Result<impl Responder, StreamError> {
251229
let stream_name = stream_name.into_inner();
252230

253-
if !PARSEABLE.streams.contains(&stream_name) {
254-
// For query mode, if the stream not found in memory map,
255-
//check if it exists in the storage
256-
//create stream and schema from storage
257-
if PARSEABLE.options.mode == Mode::Query {
258-
match PARSEABLE
259-
.create_stream_and_schema_from_storage(&stream_name)
260-
.await
261-
{
262-
Ok(true) => {}
263-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
264-
}
265-
} else {
266-
return Err(StreamNotFound(stream_name).into());
267-
}
231+
// For query mode, if the stream not found in memory map,
232+
//check if it exists in the storage
233+
//create stream and schema from storage
234+
if PARSEABLE.check_or_load_stream(&stream_name).await {
235+
return Err(StreamNotFound(stream_name.clone()).into());
268236
}
269237

270238
let query_string = req.query_string();
@@ -356,19 +324,13 @@ pub async fn get_stats(
356324

357325
pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
358326
let stream_name = stream_name.into_inner();
359-
if !PARSEABLE.streams.contains(&stream_name) {
360-
if PARSEABLE.options.mode == Mode::Query {
361-
match PARSEABLE
362-
.create_stream_and_schema_from_storage(&stream_name)
363-
.await
364-
{
365-
Ok(true) => {}
366-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
367-
}
368-
} else {
369-
return Err(StreamNotFound(stream_name).into());
370-
}
327+
// For query mode, if the stream not found in memory map,
328+
//check if it exists in the storage
329+
//create stream and schema from storage
330+
if PARSEABLE.check_or_load_stream(&stream_name).await {
331+
return Err(StreamNotFound(stream_name.clone()).into());
371332
}
333+
372334
let storage = PARSEABLE.storage.get_object_store();
373335
// if first_event_at is not found in memory map, check if it exists in the storage
374336
// if it exists in the storage, update the first_event_at in memory map
@@ -417,14 +379,8 @@ pub async fn put_stream_hot_tier(
417379
// For query mode, if the stream not found in memory map,
418380
//check if it exists in the storage
419381
//create stream and schema from storage
420-
if PARSEABLE.options.mode == Mode::Query {
421-
match PARSEABLE
422-
.create_stream_and_schema_from_storage(&stream_name)
423-
.await
424-
{
425-
Ok(true) => {}
426-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
427-
}
382+
if PARSEABLE.check_or_load_stream(&stream_name).await {
383+
return Err(StreamNotFound(stream_name).into());
428384
}
429385

430386
let stream = PARSEABLE.get_stream(&stream_name)?;
@@ -467,21 +423,11 @@ pub async fn put_stream_hot_tier(
467423
pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
468424
let stream_name = stream_name.into_inner();
469425

470-
if !PARSEABLE.streams.contains(&stream_name) {
471-
// For query mode, if the stream not found in memory map,
472-
//check if it exists in the storage
473-
//create stream and schema from storage
474-
if PARSEABLE.options.mode == Mode::Query {
475-
match PARSEABLE
476-
.create_stream_and_schema_from_storage(&stream_name)
477-
.await
478-
{
479-
Ok(true) => {}
480-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
481-
}
482-
} else {
483-
return Err(StreamNotFound(stream_name).into());
484-
}
426+
// For query mode, if the stream not found in memory map,
427+
//check if it exists in the storage
428+
//create stream and schema from storage
429+
if PARSEABLE.check_or_load_stream(&stream_name).await {
430+
return Err(StreamNotFound(stream_name.clone()).into());
485431
}
486432

487433
let Some(hot_tier_manager) = HotTierManager::global() else {
@@ -500,14 +446,8 @@ pub async fn delete_stream_hot_tier(
500446
// For query mode, if the stream not found in memory map,
501447
//check if it exists in the storage
502448
//create stream and schema from storage
503-
if PARSEABLE.options.mode == Mode::Query {
504-
match PARSEABLE
505-
.create_stream_and_schema_from_storage(&stream_name)
506-
.await
507-
{
508-
Ok(true) => {}
509-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
510-
}
449+
if PARSEABLE.check_or_load_stream(&stream_name).await {
450+
return Err(StreamNotFound(stream_name).into());
511451
}
512452

513453
if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal {

0 commit comments

Comments
 (0)