Skip to content

Commit 1d81762

Browse files
committed
Merge remote-tracking branch 'origin/main' into regex
2 parents 5c4d580 + 77ae28e commit 1d81762

40 files changed

+1222
-227
lines changed

.github/workflows/build-push-edge-debug.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,3 @@ jobs:
4545
push: true
4646
tags: parseable/parseable:edge-debug
4747
platforms: linux/amd64,linux/arm64
48-
cache-from: type=gha
49-
cache-to: type=gha,mode=max

.github/workflows/build-push-edge.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,3 @@ jobs:
4545
push: true
4646
tags: parseable/parseable:edge
4747
platforms: linux/amd64,linux/arm64
48-
cache-from: type=gha
49-
cache-to: type=gha,mode=max

Cargo.lock

+3-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "parseable"
3-
version = "1.7.3"
3+
version = "1.7.5"
44
authors = ["Parseable Team <[email protected]>"]
55
edition = "2021"
66
rust-version = "1.83.0"
@@ -128,9 +128,8 @@ sha1_smol = { version = "1.0", features = ["std"] }
128128
static-files = "0.2"
129129
ureq = "2.12"
130130
url = "2.5"
131-
vergen = { version = "9.0", features = ["build", "cargo", "rustc", "si"] }
132131
vergen-gitcl = { version = "1.0", features = ["build", "cargo", "rustc", "si"] }
133-
zip = { version = "2.2", default-features = false, features = ["deflate"] }
132+
zip = { version = "2.3", default-features = false, features = ["deflate"] }
134133
anyhow = "1.0"
135134

136135
[dev-dependencies]
@@ -139,8 +138,8 @@ arrow = "54.0.0"
139138
temp-dir = "0.1.14"
140139

141140
[package.metadata.parseable_ui]
142-
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.18/build.zip"
143-
assets-sha1 = "4516db38c8e556707b29b33569f9b1e53d5165f2"
141+
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.20/build.zip"
142+
assets-sha1 = "58eb74bdb6727b5df04d6f9f339cf370c0bf4eec"
144143

145144
[features]
146145
debug = []

Dockerfile

+3-10
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,23 @@
1414
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

1616
# build stage
17-
FROM rust:1.84.0-bookworm AS builder
18-
17+
FROM rust:1.84.0-bookworm AS builder
1918

2019
LABEL org.opencontainers.image.title="Parseable"
2120
LABEL maintainer="Parseable Team <[email protected]>"
2221
LABEL org.opencontainers.image.vendor="Parseable Inc"
2322
LABEL org.opencontainers.image.licenses="AGPL-3.0"
2423

2524
WORKDIR /parseable
26-
27-
# Cache dependencies
28-
COPY Cargo.toml Cargo.lock build.rs ./
29-
RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build --release && rm -rf src
30-
31-
# Build the actual binary
32-
COPY src ./src
25+
COPY . .
3326
RUN cargo build --release
3427

3528
# final stage
3629
FROM gcr.io/distroless/cc-debian12:latest
3730

3831
WORKDIR /parseable
3932

40-
# Copy the static binary into the final image
33+
# Copy the static shell into base image.
4134
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
4235

4336
CMD ["/usr/bin/parseable"]

src/alerts/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,7 @@ pub struct AlertsInfo {
886886
low: u64,
887887
medium: u64,
888888
high: u64,
889+
critical: u64,
889890
}
890891

891892
// TODO: add RBAC
@@ -898,6 +899,7 @@ pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
898899
let mut low = 0;
899900
let mut medium = 0;
900901
let mut high = 0;
902+
let mut critical = 0;
901903

902904
for (_, alert) in alerts.iter() {
903905
total += 1;
@@ -911,7 +913,7 @@ pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
911913
Severity::Low => low += 1,
912914
Severity::Medium => medium += 1,
913915
Severity::High => high += 1,
914-
_ => {}
916+
Severity::Critical => critical += 1,
915917
}
916918
}
917919

@@ -923,5 +925,6 @@ pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
923925
low,
924926
medium,
925927
high,
928+
critical,
926929
})
927930
}

src/cli.rs

+52-18
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,14 @@ pub struct Options {
270270
)]
271271
pub row_group_size: usize,
272272

273+
#[arg(
274+
long,
275+
env = "P_EXECUTION_BATCH_SIZE",
276+
default_value = "20000",
277+
help = "batch size for query execution"
278+
)]
279+
pub execution_batch_size: usize,
280+
273281
#[arg(
274282
long = "compression-algo",
275283
env = "P_PARQUET_COMPRESSION_ALGO",
@@ -295,6 +303,14 @@ pub struct Options {
295303
)]
296304
pub ingestor_endpoint: String,
297305

306+
#[arg(
307+
long,
308+
env = "P_INDEXER_ENDPOINT",
309+
default_value = "",
310+
help = "URL to connect to this specific indexer. Default is the address of the server"
311+
)]
312+
pub indexer_endpoint: String,
313+
298314
#[command(flatten)]
299315
pub oidc: Option<OidcConfig>,
300316

@@ -396,29 +412,47 @@ impl Options {
396412
}
397413

398414
/// TODO: refactor and document
399-
pub fn get_url(&self) -> Url {
400-
if self.ingestor_endpoint.is_empty() {
401-
return format!(
402-
"{}://{}",
403-
self.get_scheme(),
404-
self.address
405-
)
406-
.parse::<Url>() // if the value was improperly set, this will panic before hand
407-
.unwrap_or_else(|err| {
408-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
409-
});
410-
}
411-
412-
let ingestor_endpoint = &self.ingestor_endpoint;
415+
pub fn get_url(&self, mode: Mode) -> Url {
416+
let (endpoint, env_var) = match mode {
417+
Mode::Ingest => {
418+
if self.ingestor_endpoint.is_empty() {
419+
return format!(
420+
"{}://{}",
421+
self.get_scheme(),
422+
self.address
423+
)
424+
.parse::<Url>() // if the value was improperly set, this will panic before hand
425+
.unwrap_or_else(|err| {
426+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
427+
});
428+
}
429+
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
430+
}
431+
Mode::Index => {
432+
if self.indexer_endpoint.is_empty() {
433+
return format!(
434+
"{}://{}",
435+
self.get_scheme(),
436+
self.address
437+
)
438+
.parse::<Url>() // if the value was improperly set, this will panic before hand
439+
.unwrap_or_else(|err| {
440+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
441+
});
442+
}
443+
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
444+
}
445+
_ => panic!("Invalid mode"),
446+
};
413447

414-
if ingestor_endpoint.starts_with("http") {
415-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
448+
if endpoint.starts_with("http") {
449+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
416450
}
417451

418-
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
452+
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
419453

420454
if addr_from_env.len() != 2 {
421-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
455+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
422456
}
423457

424458
let mut hostname = addr_from_env[0].to_string();

src/connectors/kafka/processor.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
2119
use async_trait::async_trait;
2220
use futures_util::StreamExt;
2321
use rdkafka::consumer::{CommitMode, Consumer};
2422
use serde_json::Value;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
2525
use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
3131
format::{json, EventFormat, LogSourceEntry},
32-
Event as ParseableEvent,
32+
Event as ParseableEvent, USER_AGENT_KEY,
3333
},
3434
parseable::PARSEABLE,
3535
storage::StreamType,
@@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
7676
}
7777
}
7878

79+
let mut p_custom_fields = HashMap::new();
80+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
81+
7982
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
8083
stream_name.to_string(),
8184
total_payload_size,
@@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
8588
time_partition.as_ref(),
8689
schema_version,
8790
StreamType::UserDefined,
91+
&p_custom_fields,
8892
)?;
8993

9094
Ok(p_event)

src/enterprise/utils.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, path::PathBuf, sync::Arc};
22

3+
use chrono::{TimeZone, Utc};
34
use datafusion::{common::Column, prelude::Expr};
45
use itertools::Itertools;
56
use relative_path::RelativePathBuf;
@@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths(
119120

120121
selected_files
121122
.into_iter()
122-
.map(|file| {
123+
.filter_map(|file| {
123124
let date = file.file_path.split("/").collect_vec();
124125

125-
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126-
127-
let date = RelativePathBuf::from_iter(date);
128-
129-
parquet_files.entry(date).or_default().push(file);
126+
let year = &date[1][5..9];
127+
let month = &date[1][10..12];
128+
let day = &date[1][13..15];
129+
let hour = &date[2][5..7];
130+
let min = &date[3][7..9];
131+
let file_date = Utc
132+
.with_ymd_and_hms(
133+
year.parse::<i32>().unwrap(),
134+
month.parse::<u32>().unwrap(),
135+
day.parse::<u32>().unwrap(),
136+
hour.parse::<u32>().unwrap(),
137+
min.parse::<u32>().unwrap(),
138+
0,
139+
)
140+
.unwrap();
141+
142+
if file_date < time_range.start {
143+
None
144+
} else {
145+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
146+
147+
let date = RelativePathBuf::from_iter(date);
148+
149+
parquet_files.entry(date).or_default().push(file);
150+
Some("")
151+
}
130152
})
131153
.for_each(|_| {});
132154

src/event/format/json.rs

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl EventFormat for Event {
149149
time_partition: Option<&String>,
150150
schema_version: SchemaVersion,
151151
stream_type: StreamType,
152+
p_custom_fields: &HashMap<String, String>,
152153
) -> Result<super::Event, anyhow::Error> {
153154
let custom_partition_values = match custom_partitions.as_ref() {
154155
Some(custom_partition) => {
@@ -168,6 +169,7 @@ impl EventFormat for Event {
168169
static_schema_flag,
169170
time_partition,
170171
schema_version,
172+
p_custom_fields,
171173
)?;
172174

173175
Ok(super::Event {

0 commit comments

Comments
 (0)