Skip to content

Commit 031fe55

Browse files
committed
Merge remote-tracking branch 'origin/main' into to_json
2 parents f8c0459 + 7d9b9ab commit 031fe55

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2778
-1428
lines changed

CONTRIBUTING.md

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
### Contributing
2+
3+
This document outlines how you can contribute to Parseable.
4+
5+
Thank you for considering to contribute to Parseable. The goal of this document is to provide everything you need to start your contribution. We encourage all contributions, including but not limited to:
6+
- Code patches, bug fixes.
7+
- Tutorials or blog posts.
8+
- Improving the documentation.
9+
- Submitting [bug reports](https://github.com/parseablehq/parseable/issues/new).
10+
11+
### Prerequisites
12+
13+
- Your PR has an associated issue. Find an [existing issue](https://github.com/parseablehq/parseable/issues) or open a new issue.
14+
- You've discussed the with [Parseable community](https://logg.ing/community).
15+
- You're familiar with GitHub Pull Requests(PR) workflow.
16+
- You've read the [Parseable documentation](https://www.parseable.com/docs).
17+
18+
### Contribution workflow
19+
20+
- Fork the [Parseable repository](https://help.github.com/en/github/getting-started-with-github/fork-a-repo) in your own GitHub account.
21+
- [Create a new branch](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-and-deleting-branches-within-your-repository).
22+
- Review the Development Workflow section that describes the steps to maintain the repository.
23+
- Make your changes on your branch.
24+
- Submit the branch as a Pull Request pointing to the main branch of the Parseable repository. A maintainer should comment and/or review your Pull Request within a few hours.
25+
- You’ll be asked to review & sign the [Parseable Contributor License Agreement (CLA)](https://github.com/parseablehq/.github/blob/main/CLA.md) on the GitHub PR. Please ensure that you review the document. Once you’re ready, please sign the CLA by adding a comment I have read the CLA Document and I hereby sign the CLA in your Pull Request.
26+
- Please ensure that the commits in your Pull Request are made by the same GitHub user (which was used to create the PR and add the comment).
27+
28+
### Development workflow
29+
30+
We recommend Linux or macOS as the development platform for Parseable.
31+
32+
#### Setup and run Parseable
33+
34+
Parseable needs Rust 1.77.1 or above. Use the below command to build and run Parseable binary with local mode.
35+
36+
```sh
37+
cargo run --release local-store
38+
```
39+
40+
We recommend using the --release flag to test the full performance of Parseable.
41+
42+
#### Running Tests
43+
44+
```sh
45+
cargo test
46+
```
47+
48+
This command will be triggered to each PR as a requirement for merging it.
49+
50+
If you get a "Too many open files" error you might want to increase the open file limit using this command:
51+
52+
```sh
53+
ulimit -Sn 3000
54+
```
55+
56+
If you get a OpenSSL related error while building Parseable, you might need to install the dependencies using this command:
57+
58+
```sh
59+
sudo apt install build-essential
60+
sudo apt-get install libssl-dev
61+
sudo apt-get install pkg-config
62+
```
63+
64+
### Git Guidelines
65+
66+
- The PR title should be accurate and descriptive of the changes.
67+
- The draft PRs are recommended when you want to show that you are working on something and make your work visible. Convert your PR as a draft if your changes are a work in progress. Maintainers will review the PR once you mark your PR as ready for review.
68+
- The branch related to the PR must be up-to-date with main before merging. We use Bors to automatically enforce this requirement without the PR author having to rebase manually.

Cargo.lock

+2-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ bytes = "1.4"
106106
clokwerk = "0.4"
107107
derive_more = { version = "1", features = ["full"] }
108108
itertools = "0.14"
109-
lazy_static = "1.4"
110109
once_cell = "1.20"
111110
rand = "0.8.5"
112111
regex = "1.7.3"

Cross.toml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[target.aarch64-unknown-linux-gnu]
2+
image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu@sha256:1e2a0291f92a4372cbc22d8994e735473045383f1ce7fa44a16c234ba00187f4"
3+
4+
[target.x86_64-unknown-linux-gnu]
5+
image = "ghcr.io/cross-rs/x86_64-unknown-linux-gnu@sha256:bf05360bb9d6d4947eed60532ac7a0d7e8fae8f214e9abb801d5941c8fe4918d"

src/alerts/alerts_utils.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tracing::trace;
3131

3232
use crate::{
3333
alerts::AggregateCondition,
34+
parseable::PARSEABLE,
3435
query::{TableScanVisitor, QUERY_SESSION},
3536
rbac::{
3637
map::SessionKey,
@@ -137,8 +138,9 @@ async fn execute_base_query(
137138
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
138139
})?;
139140

141+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
140142
query
141-
.get_dataframe(stream_name)
143+
.get_dataframe(time_partition.as_ref())
142144
.await
143145
.map_err(|err| AlertError::CustomError(err.to_string()))
144146
}

src/alerts/mod.rs

+57-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use serde::Serialize;
2728
use serde_json::Error as SerdeError;
2829
use std::collections::{HashMap, HashSet};
2930
use std::fmt::{self, Display};
@@ -36,7 +37,7 @@ use ulid::Ulid;
3637
pub mod alerts_utils;
3738
pub mod target;
3839

39-
use crate::parseable::PARSEABLE;
40+
use crate::parseable::{StreamNotFound, PARSEABLE};
4041
use crate::query::{TableScanVisitor, QUERY_SESSION};
4142
use crate::rbac::map::SessionKey;
4243
use crate::storage;
@@ -513,17 +514,16 @@ impl AlertConfig {
513514

514515
// for now proceed in a similar fashion as we do in query
515516
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
516-
let stream_name = if let Some(stream_name) = query.first_table_name() {
517-
stream_name
518-
} else {
517+
let Some(stream_name) = query.first_table_name() else {
519518
return Err(AlertError::CustomError(format!(
520519
"Table name not found in query- {}",
521520
self.query
522521
)));
523522
};
524523

524+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
525525
let base_df = query
526-
.get_dataframe(stream_name)
526+
.get_dataframe(time_partition.as_ref())
527527
.await
528528
.map_err(|err| AlertError::CustomError(err.to_string()))?;
529529

@@ -703,6 +703,8 @@ pub enum AlertError {
703703
CustomError(String),
704704
#[error("Invalid State Change: {0}")]
705705
InvalidStateChange(String),
706+
#[error("{0}")]
707+
StreamNotFound(#[from] StreamNotFound),
706708
}
707709

708710
impl actix_web::ResponseError for AlertError {
@@ -716,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
716718
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
717719
Self::CustomError(_) => StatusCode::BAD_REQUEST,
718720
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
721+
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
719722
}
720723
}
721724

@@ -873,3 +876,52 @@ impl Alerts {
873876
Ok(())
874877
}
875878
}
879+
880+
#[derive(Debug, Serialize)]
881+
pub struct AlertsInfo {
882+
total: u64,
883+
silenced: u64,
884+
resolved: u64,
885+
triggered: u64,
886+
low: u64,
887+
medium: u64,
888+
high: u64,
889+
}
890+
891+
// TODO: add RBAC
892+
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
893+
let alerts = ALERTS.alerts.read().await;
894+
let mut total = 0;
895+
let mut silenced = 0;
896+
let mut resolved = 0;
897+
let mut triggered = 0;
898+
let mut low = 0;
899+
let mut medium = 0;
900+
let mut high = 0;
901+
902+
for (_, alert) in alerts.iter() {
903+
total += 1;
904+
match alert.state {
905+
AlertState::Silenced => silenced += 1,
906+
AlertState::Resolved => resolved += 1,
907+
AlertState::Triggered => triggered += 1,
908+
}
909+
910+
match alert.severity {
911+
Severity::Low => low += 1,
912+
Severity::Medium => medium += 1,
913+
Severity::High => high += 1,
914+
_ => {}
915+
}
916+
}
917+
918+
Ok(AlertsInfo {
919+
total,
920+
silenced,
921+
resolved,
922+
triggered,
923+
low,
924+
medium,
925+
high,
926+
})
927+
}

src/catalog/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343+
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
344+
std::io::Error::new(
345+
std::io::ErrorKind::Unsupported,
346+
"Can't remove manifest from within Index server",
347+
),
348+
))),
343349
}
344350
}
345351

@@ -350,6 +356,7 @@ pub async fn get_first_event(
350356
) -> Result<Option<String>, ObjectStorageError> {
351357
let mut first_event_at: String = String::default();
352358
match PARSEABLE.options.mode {
359+
Mode::Index => unimplemented!(),
353360
Mode::All | Mode::Ingest => {
354361
// get current snapshot
355362
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

src/cli.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,12 @@ pub struct Options {
260260
help = "Set a fixed memory limit for query in GiB"
261261
)]
262262
pub query_memory_pool_size: Option<usize>,
263-
263+
// reduced the max row group size from 1048576
264+
// smaller row groups help in faster query performance in multi threaded query
264265
#[arg(
265266
long,
266267
env = "P_PARQUET_ROW_GROUP_SIZE",
267-
default_value = "1048576",
268+
default_value = "262144",
268269
help = "Number of rows in a row group"
269270
)]
270271
pub row_group_size: usize,

src/connectors/kafka/processor.rs

+17-37
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::sync::Arc;
2020

2121
use async_trait::async_trait;
22-
use chrono::Utc;
2322
use futures_util::StreamExt;
2423
use rdkafka::consumer::{CommitMode, Consumer};
2524
use serde_json::Value;
@@ -58,37 +57,10 @@ impl ParseableSinkProcessor {
5857
let stream = PARSEABLE.get_stream(stream_name)?;
5958
let schema = stream.get_schema_raw();
6059
let time_partition = stream.get_time_partition();
60+
let custom_partition = stream.get_custom_partition();
6161
let static_schema_flag = stream.get_static_schema_flag();
6262
let schema_version = stream.get_schema_version();
6363

64-
let (json_vec, total_payload_size) = Self::json_vec(records);
65-
let batch_json_event = json::Event {
66-
data: Value::Array(json_vec),
67-
};
68-
69-
let (rb, is_first) = batch_json_event.into_recordbatch(
70-
&schema,
71-
static_schema_flag,
72-
time_partition.as_ref(),
73-
schema_version,
74-
)?;
75-
76-
let p_event = ParseableEvent {
77-
rb,
78-
stream_name: stream_name.to_string(),
79-
origin_format: "json",
80-
origin_size: total_payload_size,
81-
is_first_event: is_first,
82-
parsed_timestamp: Utc::now().naive_utc(),
83-
time_partition: None,
84-
custom_partition_values: HashMap::new(),
85-
stream_type: StreamType::UserDefined,
86-
};
87-
88-
Ok(p_event)
89-
}
90-
91-
fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
9264
let mut json_vec = Vec::with_capacity(records.len());
9365
let mut total_payload_size = 0u64;
9466

@@ -99,22 +71,30 @@ impl ParseableSinkProcessor {
9971
}
10072
}
10173

102-
(json_vec, total_payload_size)
74+
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
75+
stream_name.to_string(),
76+
total_payload_size,
77+
&schema,
78+
static_schema_flag,
79+
custom_partition.as_ref(),
80+
time_partition.as_ref(),
81+
schema_version,
82+
StreamType::UserDefined,
83+
)?;
84+
85+
Ok(p_event)
10386
}
10487
}
10588

10689
#[async_trait]
10790
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
10891
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
10992
let len = records.len();
110-
debug!("Processing {} records", len);
93+
debug!("Processing {len} records");
11194

112-
self.build_event_from_chunk(&records)
113-
.await?
114-
.process()
115-
.await?;
95+
self.build_event_from_chunk(&records).await?.process()?;
11696

117-
debug!("Processed {} records", len);
97+
debug!("Processed {len} records");
11898
Ok(())
11999
}
120100
}

src/enterprise/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod utils;

0 commit comments

Comments
 (0)