Skip to content

Commit 1cd0da6

Browse files
refactor: Parseable as the main interface (#1143)
Refactors code to share a Parseable object that is the interface to perform all operations in parseable, instead of having distinct interfaces such as CONFIG, StreamInfo, etc. Co-authored-by: Nikhil Sinha <[email protected]> Signed-off-by: Devdutt Shenoi <[email protected]>
1 parent 70ca2f0 commit 1cd0da6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2309
-2373
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ key.pem
1010
helm-releases/.DS_Store
1111
.DS_Store
1212
env-file
13-
parseable
13+
parseable/*
1414
parseable_*
1515
parseable-env-secret
1616
cache

src/about.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
*
1818
*/
1919

20-
use crate::analytics;
21-
use crate::option::Config;
22-
use crate::storage::StorageMetadata;
23-
use crate::utils::update::{self, LatestRelease};
2420
use chrono::Duration;
2521
use chrono_humanize::{Accuracy, Tense};
2622
use crossterm::style::Stylize;
@@ -30,6 +26,11 @@ use std::path::Path;
3026
use sysinfo::System;
3127
use ulid::Ulid;
3228

29+
use crate::analytics;
30+
use crate::cli::Options;
31+
use crate::storage::StorageMetadata;
32+
use crate::utils::update::{self, LatestRelease};
33+
3334
// Expose some static variables for internal usage
3435
pub static LATEST_RELEASE: OnceCell<Option<LatestRelease>> = OnceCell::new();
3536

@@ -99,7 +100,7 @@ impl ParseableVersion {
99100

100101
pub fn print_about(
101102
current_version: semver::Version,
102-
latest_release: Option<update::LatestRelease>,
103+
latest_release: Option<LatestRelease>,
103104
commit_hash: String,
104105
) {
105106
eprint!(
@@ -123,7 +124,7 @@ pub fn print_about(
123124
);
124125
}
125126

126-
fn print_latest_release(latest_release: update::LatestRelease) {
127+
fn print_latest_release(latest_release: LatestRelease) {
127128
let time_since_latest_release = chrono::Utc::now() - latest_release.date;
128129
let time_since_latest_release = humanize_time(time_since_latest_release);
129130
let fmt_latest_version = format!(
@@ -133,10 +134,10 @@ fn print_latest_release(latest_release: update::LatestRelease) {
133134
eprint!("{}", fmt_latest_version.red());
134135
}
135136

136-
pub async fn print(config: &Config, meta: &StorageMetadata) {
137+
pub async fn print(options: &Options, meta: &StorageMetadata) {
137138
// print current version
138139
let current = current();
139-
let latest_release = if config.options.check_update {
140+
let latest_release = if options.check_update {
140141
update::get_latest(&meta.deployment_id).await.ok()
141142
} else {
142143
None

src/alerts/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use ulid::Ulid;
3636
pub mod alerts_utils;
3737
pub mod target;
3838

39-
use crate::option::CONFIG;
39+
use crate::parseable::PARSEABLE;
4040
use crate::query::{TableScanVisitor, QUERY_SESSION};
4141
use crate::rbac::map::SessionKey;
4242
use crate::storage;
@@ -650,8 +650,8 @@ impl AlertConfig {
650650
fn get_context(&self) -> Context {
651651
let deployment_instance = format!(
652652
"{}://{}",
653-
CONFIG.options.get_scheme(),
654-
CONFIG.options.address
653+
PARSEABLE.options.get_scheme(),
654+
PARSEABLE.options.address
655655
);
656656
let deployment_id = storage::StorageMetadata::global().deployment_id;
657657
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
@@ -730,7 +730,7 @@ impl Alerts {
730730
/// Loads alerts from disk, blocks
731731
pub async fn load(&self) -> Result<(), AlertError> {
732732
let mut map = self.alerts.write().await;
733-
let store = CONFIG.storage().get_object_store();
733+
let store = PARSEABLE.storage.get_object_store();
734734

735735
for alert in store.get_alerts().await.unwrap_or_default() {
736736
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
@@ -792,7 +792,7 @@ impl Alerts {
792792
new_state: AlertState,
793793
trigger_notif: Option<String>,
794794
) -> Result<(), AlertError> {
795-
let store = CONFIG.storage().get_object_store();
795+
let store = PARSEABLE.storage.get_object_store();
796796

797797
// read and modify alert
798798
let mut alert = self.get_alert_by_id(alert_id).await?;

src/analytics.rs

+20-15
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,6 @@
1616
*
1717
*
1818
*/
19-
20-
use crate::about::{current, platform};
21-
use crate::handlers::http::cluster::utils::check_liveness;
22-
use crate::handlers::http::{base_path_without_preceding_slash, cluster};
23-
use crate::handlers::STREAM_NAME_HEADER_KEY;
24-
use crate::option::{Mode, CONFIG};
25-
use crate::{metadata, stats};
26-
use crate::{storage, HTTP_CLIENT};
27-
28-
use crate::stats::Stats;
2919
use actix_web::{web, HttpRequest, Responder};
3020
use chrono::{DateTime, Utc};
3121
use clokwerk::{AsyncScheduler, Interval};
@@ -40,6 +30,21 @@ use sysinfo::System;
4030
use tracing::{error, info};
4131
use ulid::Ulid;
4232

33+
use crate::{
34+
about::{current, platform},
35+
handlers::{
36+
http::{
37+
base_path_without_preceding_slash,
38+
cluster::{self, utils::check_liveness},
39+
},
40+
STREAM_NAME_HEADER_KEY,
41+
},
42+
option::Mode,
43+
parseable::PARSEABLE,
44+
stats::{self, Stats},
45+
storage, HTTP_CLIENT,
46+
};
47+
4348
const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
4449
const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1);
4550

@@ -111,8 +116,8 @@ impl Report {
111116
cpu_count,
112117
memory_total_bytes: mem_total,
113118
platform: platform().to_string(),
114-
storage_mode: CONFIG.get_storage_mode_string().to_string(),
115-
server_mode: CONFIG.options.mode,
119+
storage_mode: PARSEABLE.get_storage_mode_string().to_string(),
120+
server_mode: PARSEABLE.options.mode,
116121
version: current().released_version.to_string(),
117122
commit_hash: current().commit_hash,
118123
active_ingestors: ingestor_metrics.0,
@@ -148,7 +153,7 @@ pub async fn get_analytics(_: HttpRequest) -> impl Responder {
148153
}
149154

150155
fn total_streams() -> usize {
151-
metadata::STREAM_INFO.list_streams().len()
156+
PARSEABLE.streams.len()
152157
}
153158

154159
fn total_event_stats() -> (Stats, Stats, Stats) {
@@ -164,7 +169,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) {
164169
let mut deleted_parquet_bytes: u64 = 0;
165170
let mut deleted_json_bytes: u64 = 0;
166171

167-
for stream in metadata::STREAM_INFO.list_streams() {
172+
for stream in PARSEABLE.streams.list() {
168173
let Some(stats) = stats::get_current_stats(&stream, "json") else {
169174
continue;
170175
};
@@ -219,7 +224,7 @@ async fn fetch_ingestors_metrics(
219224
let mut vec = vec![];
220225
let mut active_ingestors = 0u64;
221226
let mut offline_ingestors = 0u64;
222-
if CONFIG.options.mode == Mode::Query {
227+
if PARSEABLE.options.mode == Mode::Query {
223228
// send analytics for ingest servers
224229

225230
// ingestor infos should be valid here, if not some thing is wrong

src/audit.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ use std::{
2121
fmt::{Debug, Display},
2222
};
2323

24-
use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};
24+
use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT};
2525

26-
use super::option::CONFIG;
2726
use chrono::{DateTime, Utc};
2827
use once_cell::sync::Lazy;
2928
use serde::Serialize;
@@ -47,7 +46,12 @@ impl AuditLogger {
4746
// Try to construct the log endpoint URL by joining the base URL
4847
// with the ingest path, This can fail if the URL is not valid,
4948
// when the base URL is not set or the ingest path is not valid
50-
let log_endpoint = match CONFIG.options.audit_logger.as_ref()?.join("/api/v1/ingest") {
49+
let log_endpoint = match PARSEABLE
50+
.options
51+
.audit_logger
52+
.as_ref()?
53+
.join("/api/v1/ingest")
54+
{
5155
Ok(url) => url,
5256
Err(err) => {
5357
eprintln!("Couldn't setup audit logger: {err}");
@@ -66,8 +70,8 @@ impl AuditLogger {
6670
.header("x-p-stream", "audit_log");
6771

6872
// Use basic auth if credentials are configured
69-
if let Some(username) = CONFIG.options.audit_username.as_ref() {
70-
req = req.basic_auth(username, CONFIG.options.audit_password.as_ref())
73+
if let Some(username) = PARSEABLE.options.audit_username.as_ref() {
74+
req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref())
7175
}
7276

7377
match req.send().await {

src/banner.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ use crossterm::style::Stylize;
2121

2222
use crate::about;
2323
use crate::utils::uid::Uid;
24-
use crate::{option::Config, storage::StorageMetadata};
24+
use crate::{parseable::Parseable, storage::StorageMetadata};
2525

26-
pub async fn print(config: &Config, meta: &StorageMetadata) {
26+
pub async fn print(config: &Parseable, meta: &StorageMetadata) {
2727
print_ascii_art();
2828
let scheme = config.options.get_scheme();
2929
status_info(config, &scheme, meta.deployment_id);
3030
storage_info(config).await;
31-
about::print(config, meta).await;
31+
about::print(&config.options, meta).await;
3232
println!();
3333
}
3434

@@ -46,7 +46,7 @@ fn print_ascii_art() {
4646
eprint!("{ascii_name}");
4747
}
4848

49-
fn status_info(config: &Config, scheme: &str, id: Uid) {
49+
fn status_info(config: &Parseable, scheme: &str, id: Uid) {
5050
let address = format!(
5151
"\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)",
5252
scheme,
@@ -59,7 +59,7 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
5959
let mut credentials =
6060
String::from("\"As set in P_USERNAME and P_PASSWORD environment variables\"");
6161

62-
if config.is_default_creds() {
62+
if config.options.is_default_creds() {
6363
credentials = "\"Using default creds admin, admin. Please set credentials with P_USERNAME and P_PASSWORD.\"".red().to_string();
6464
}
6565

@@ -93,7 +93,7 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
9393
/// - Mode (`Local drive`, `S3 bucket`)
9494
/// - Staging (temporary landing point for incoming events)
9595
/// - Store (path where the data is stored and its latency)
96-
async fn storage_info(config: &Config) {
96+
async fn storage_info(config: &Parseable) {
9797
let storage = config.storage();
9898
let latency = storage.get_object_store().get_latency().await;
9999

src/catalog/mod.rs

+31-30
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,31 @@
1818

1919
use std::{io::ErrorKind, sync::Arc};
2020

21-
use self::{column::Column, snapshot::ManifestItem};
22-
use crate::handlers;
23-
use crate::handlers::http::base_path_without_preceding_slash;
24-
use crate::metadata::STREAM_INFO;
25-
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
26-
use crate::option::{Mode, CONFIG};
27-
use crate::stats::{
28-
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
29-
};
30-
use crate::{
31-
catalog::manifest::Manifest,
32-
event::DEFAULT_TIMESTAMP_KEY,
33-
query::PartialTimeFilter,
34-
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
35-
};
3621
use chrono::{DateTime, Local, NaiveTime, Utc};
22+
use column::Column;
23+
use manifest::Manifest;
3724
use relative_path::RelativePathBuf;
25+
use snapshot::ManifestItem;
3826
use std::io::Error as IOError;
3927
use tracing::{error, info};
28+
29+
use crate::{
30+
event::DEFAULT_TIMESTAMP_KEY,
31+
handlers::{self, http::base_path_without_preceding_slash},
32+
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
33+
option::Mode,
34+
parseable::PARSEABLE,
35+
query::PartialTimeFilter,
36+
stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats},
37+
storage::{
38+
object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat,
39+
},
40+
};
41+
pub use manifest::create_from_parquet_file;
42+
4043
pub mod column;
4144
pub mod manifest;
4245
pub mod snapshot;
43-
use crate::storage::ObjectStoreFormat;
44-
pub use manifest::create_from_parquet_file;
4546
pub trait Snapshot {
4647
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem>;
4748
}
@@ -263,7 +264,7 @@ async fn create_manifest(
263264
files: vec![change],
264265
..Manifest::default()
265266
};
266-
let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?;
267+
let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event();
267268
if first_event_at.is_none() {
268269
if let Some(first_event) = manifest.files.first() {
269270
let time_partition = &meta.time_partition;
@@ -279,13 +280,11 @@ async fn create_manifest(
279280
}
280281
};
281282
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
282-
if let Err(err) =
283-
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
284-
{
285-
error!(
286-
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
287-
stream_name
288-
);
283+
match PARSEABLE.get_stream(stream_name) {
284+
Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()),
285+
Err(err) => error!(
286+
"Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}"
287+
),
289288
}
290289
}
291290
}
@@ -332,11 +331,11 @@ pub async fn remove_manifest_from_snapshot(
332331
let manifests = &mut meta.snapshot.manifest_list;
333332
// Filter out items whose manifest_path contains any of the dates_to_delete
334333
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
335-
STREAM_INFO.reset_first_event_at(stream_name)?;
334+
PARSEABLE.get_stream(stream_name)?.reset_first_event_at();
336335
meta.first_event_at = None;
337336
storage.put_snapshot(stream_name, meta.snapshot).await?;
338337
}
339-
match CONFIG.options.mode {
338+
match PARSEABLE.options.mode {
340339
Mode::All | Mode::Ingest => {
341340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
342341
}
@@ -350,10 +349,10 @@ pub async fn get_first_event(
350349
dates: Vec<String>,
351350
) -> Result<Option<String>, ObjectStorageError> {
352351
let mut first_event_at: String = String::default();
353-
match CONFIG.options.mode {
352+
match PARSEABLE.options.mode {
354353
Mode::All | Mode::Ingest => {
355354
// get current snapshot
356-
let stream_first_event = STREAM_INFO.get_first_event(stream_name)?;
355+
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
357356
if stream_first_event.is_some() {
358357
first_event_at = stream_first_event.unwrap();
359358
} else {
@@ -393,7 +392,9 @@ pub async fn get_first_event(
393392
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
394393
meta.first_event_at = Some(first_event_at.clone());
395394
storage.put_stream_manifest(stream_name, &meta).await?;
396-
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
395+
PARSEABLE
396+
.get_stream(stream_name)?
397+
.set_first_event_at(&first_event_at);
397398
}
398399
}
399400
}

src/cli.rs

+4
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,8 @@ impl Options {
381381
origin,
382382
})
383383
}
384+
385+
pub fn is_default_creds(&self) -> bool {
386+
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
387+
}
384388
}

0 commit comments

Comments
 (0)