Skip to content

Commit 2477b20

Browse files
authored
chore: remove dead_code (#1132)
1 parent 750140b commit 2477b20

File tree

12 files changed

+7
-216
lines changed

12 files changed

+7
-216
lines changed

src/event/writer/file_writer.rs

+2-10
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ use crate::storage::staging::StorageDir;
2929
use chrono::NaiveDateTime;
3030

3131
pub struct ArrowWriter {
32-
#[allow(dead_code)]
33-
pub file_path: PathBuf,
3432
pub writer: StreamWriter<File>,
3533
}
3634

@@ -54,20 +52,14 @@ impl FileWriter {
5452
// entry is not present thus we create it
5553
None => {
5654
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
57-
let (path, writer) = init_new_stream_writer_file(
55+
let (_, writer) = init_new_stream_writer_file(
5856
stream_name,
5957
schema_key,
6058
record,
6159
parsed_timestamp,
6260
custom_partition_values,
6361
)?;
64-
self.insert(
65-
schema_key.to_owned(),
66-
ArrowWriter {
67-
file_path: path,
68-
writer,
69-
},
70-
);
62+
self.insert(schema_key.to_owned(), ArrowWriter { writer });
7163
}
7264
};
7365

src/handlers/http/cluster/utils.rs

+1-67
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,10 @@
1616
*
1717
*/
1818

19-
use crate::{
20-
handlers::http::{
21-
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
22-
},
23-
HTTP_CLIENT,
24-
};
19+
use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT};
2520
use actix_web::http::header;
2621
use chrono::{DateTime, Utc};
27-
use http::StatusCode;
2822
use itertools::Itertools;
29-
use reqwest::Response;
3023
use serde::{Deserialize, Serialize};
3124
use tracing::error;
3225
use url::Url;
@@ -247,65 +240,6 @@ pub async fn check_liveness(domain_name: &str) -> bool {
247240
req.is_ok()
248241
}
249242

250-
/// send a request to the ingestor to fetch its stats
251-
/// dead for now
252-
#[allow(dead_code)]
253-
pub async fn send_stats_request(
254-
url: &str,
255-
ingestor: IngestorMetadata,
256-
) -> Result<Option<Response>, StreamError> {
257-
if !check_liveness(&ingestor.domain_name).await {
258-
return Ok(None);
259-
}
260-
261-
let res = HTTP_CLIENT
262-
.get(url)
263-
.header(header::CONTENT_TYPE, "application/json")
264-
.header(header::AUTHORIZATION, ingestor.token)
265-
.send()
266-
.await
267-
.map_err(|err| {
268-
error!(
269-
"Fatal: failed to fetch stats from ingestor: {}\n Error: {:?}",
270-
ingestor.domain_name, err
271-
);
272-
273-
StreamError::Network(err)
274-
})?;
275-
276-
if !res.status().is_success() {
277-
error!(
278-
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
279-
ingestor.domain_name, res
280-
);
281-
return Err(StreamError::Custom {
282-
msg: format!(
283-
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
284-
ingestor.domain_name,
285-
res.text().await.unwrap_or_default()
286-
),
287-
status: StatusCode::INTERNAL_SERVER_ERROR,
288-
});
289-
}
290-
291-
Ok(Some(res))
292-
}
293-
294-
/// domain_name needs to be http://ip:port
295-
/// dead code for now
296-
#[allow(dead_code)]
297-
pub fn ingestor_meta_filename(domain_name: &str) -> String {
298-
if domain_name.starts_with("http://") | domain_name.starts_with("https://") {
299-
let url = Url::parse(domain_name).unwrap();
300-
return format!(
301-
"ingestor.{}.{}.json",
302-
url.host_str().unwrap(),
303-
url.port().unwrap()
304-
);
305-
}
306-
format!("ingestor.{}.json", domain_name)
307-
}
308-
309243
pub fn to_url_string(str: String) -> String {
310244
// if the str is already a url i am guessing that it will end in '/'
311245
if str.starts_with("http://") || str.starts_with("https://") {

src/handlers/http/logstream.rs

-12
Original file line numberDiff line numberDiff line change
@@ -465,18 +465,6 @@ pub async fn get_stats(
465465
Ok((web::Json(stats), StatusCode::OK))
466466
}
467467

468-
// Check if the first_event_at is empty
469-
#[allow(dead_code)]
470-
pub fn first_event_at_empty(stream_name: &str) -> bool {
471-
let hash_map = STREAM_INFO.read().unwrap();
472-
if let Some(stream_info) = hash_map.get(stream_name) {
473-
if let Some(first_event_at) = &stream_info.first_event_at {
474-
return first_event_at.is_empty();
475-
}
476-
}
477-
true
478-
}
479-
480468
fn remove_id_from_alerts(value: &mut Value) {
481469
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
482470
alerts

src/hottier.rs

-18
Original file line numberDiff line numberDiff line change
@@ -397,24 +397,6 @@ impl HotTierManager {
397397
Ok(file_processed)
398398
}
399399

400-
#[allow(dead_code)]
401-
///delete the files for the date range given from the hot tier directory for the stream
402-
/// update the used and available size in the hot tier metadata
403-
pub async fn delete_files_from_hot_tier(
404-
&self,
405-
stream: &str,
406-
dates: &[NaiveDate],
407-
) -> Result<(), HotTierError> {
408-
for date in dates.iter() {
409-
let path = self.hot_tier_path.join(format!("{}/date={}", stream, date));
410-
if path.exists() {
411-
fs::remove_dir_all(path.clone()).await?;
412-
}
413-
}
414-
415-
Ok(())
416-
}
417-
418400
///fetch the list of dates available in the hot tier directory for the stream and sort them
419401
pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result<Vec<NaiveDate>, HotTierError> {
420402
let mut date_list = Vec::new();

src/kafka.rs

-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ pub enum SslProtocol {
5555
SaslSsl,
5656
}
5757

58-
#[allow(dead_code)]
5958
#[derive(Debug, thiserror::Error)]
6059
pub enum KafkaError {
6160
#[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPICS, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")]

src/metrics/mod.rs

-7
Original file line numberDiff line numberDiff line change
@@ -320,11 +320,4 @@ pub mod error {
320320
.body(self.to_string())
321321
}
322322
}
323-
324-
#[allow(dead_code)]
325-
fn construct_custom_error() {
326-
let error =
327-
MetricsError::Custom("Some error".to_string(), StatusCode::INTERNAL_SERVER_ERROR);
328-
println!("{:?}", error);
329-
}
330323
}

src/query/mod.rs

+2-59
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ mod filter_optimizer;
2020
mod listing_table_builder;
2121
pub mod stream_schema_provider;
2222

23+
use chrono::NaiveDateTime;
2324
use chrono::{DateTime, Duration, Utc};
24-
use chrono::{NaiveDateTime, TimeZone};
2525
use datafusion::arrow::record_batch::RecordBatch;
2626

2727
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
@@ -38,9 +38,7 @@ use once_cell::sync::Lazy;
3838
use relative_path::RelativePathBuf;
3939
use serde::{Deserialize, Serialize};
4040
use serde_json::{json, Value};
41-
use std::collections::HashMap;
4241
use std::ops::Bound;
43-
use std::path::{Path, PathBuf};
4442
use std::sync::Arc;
4543
use stream_schema_provider::collect_manifest_files;
4644
use sysinfo::System;
@@ -56,7 +54,7 @@ use crate::event;
5654
use crate::handlers::http::query::QueryError;
5755
use crate::metadata::STREAM_INFO;
5856
use crate::option::{Mode, CONFIG};
59-
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY};
57+
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY};
6058
use crate::utils::time::TimeRange;
6159
pub static QUERY_SESSION: Lazy<SessionContext> =
6260
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
@@ -396,10 +394,6 @@ impl TableScanVisitor {
396394
pub fn into_inner(self) -> Vec<String> {
397395
self.tables
398396
}
399-
#[allow(dead_code)]
400-
pub fn top(&self) -> Option<&str> {
401-
self.tables.first().map(|s| s.as_ref())
402-
}
403397
}
404398

405399
impl TreeNodeVisitor<'_> for TableScanVisitor {
@@ -563,47 +557,6 @@ fn table_contains_any_time_filters(
563557
})
564558
}
565559

566-
#[allow(dead_code)]
567-
fn get_staging_prefixes(
568-
stream_name: &str,
569-
start: DateTime<Utc>,
570-
end: DateTime<Utc>,
571-
) -> HashMap<PathBuf, Vec<PathBuf>> {
572-
let dir = StorageDir::new(stream_name);
573-
let mut files = dir.arrow_files_grouped_by_time();
574-
files.retain(|k, _| path_intersects_query(k, start, end));
575-
files
576-
}
577-
578-
fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
579-
let time = time_from_path(path);
580-
starttime <= time && time <= endtime
581-
}
582-
583-
fn time_from_path(path: &Path) -> DateTime<Utc> {
584-
let prefix = path
585-
.file_name()
586-
.expect("all given path are file")
587-
.to_str()
588-
.expect("filename is valid");
589-
590-
// Next three in order will be date, hour and minute
591-
let mut components = prefix.splitn(3, '.');
592-
593-
let date = components.next().expect("date=xxxx-xx-xx");
594-
let hour = components.next().expect("hour=xx");
595-
let minute = components.next().expect("minute=xx");
596-
597-
let year = date[5..9].parse().unwrap();
598-
let month = date[10..12].parse().unwrap();
599-
let day = date[13..15].parse().unwrap();
600-
let hour = hour[5..7].parse().unwrap();
601-
let minute = minute[7..9].parse().unwrap();
602-
603-
Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
604-
.unwrap()
605-
}
606-
607560
/// unused for now might need it later
608561
#[allow(unused)]
609562
pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
@@ -671,16 +624,6 @@ mod tests {
671624

672625
use crate::query::flatten_objects_for_count;
673626

674-
use super::time_from_path;
675-
use std::path::PathBuf;
676-
677-
#[test]
678-
fn test_time_from_parquet_path() {
679-
let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet");
680-
let time = time_from_path(path.as_path());
681-
assert_eq!(time.timestamp(), 1640995200);
682-
}
683-
684627
#[test]
685628
fn test_flat_simple() {
686629
let val = vec![

src/response.rs

+1-13
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,11 @@
1616
*
1717
*/
1818

19-
use crate::{
20-
handlers::http::query::QueryError,
21-
utils::arrow::{
22-
flight::{into_flight_data, DoGetStream},
23-
record_batches_to_json,
24-
},
25-
};
19+
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
2620
use actix_web::HttpResponse;
2721
use datafusion::arrow::record_batch::RecordBatch;
2822
use itertools::Itertools;
2923
use serde_json::{json, Value};
30-
use tonic::{Response, Status};
3124
use tracing::info;
3225

3326
pub struct QueryResponse {
@@ -65,9 +58,4 @@ impl QueryResponse {
6558

6659
Ok(HttpResponse::Ok().json(response))
6760
}
68-
69-
#[allow(dead_code)]
70-
pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
71-
into_flight_data(self.records)
72-
}
7361
}

src/storage/mod.rs

-4
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,4 @@ pub enum ObjectStorageError {
258258
PathError(relative_path::FromPathError),
259259
#[error("Error: {0}")]
260260
MetadataError(#[from] MetadataError),
261-
262-
#[allow(dead_code)]
263-
#[error("Authentication Error: {0}")]
264-
AuthenticationError(Box<dyn std::error::Error + Send + Sync + 'static>),
265261
}

src/storage/s3.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
5050
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
5151
use std::collections::HashMap;
5252

53-
#[allow(dead_code)]
5453
// in bytes
55-
const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
54+
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
5655
const CONNECT_TIMEOUT_SECS: u64 = 5;
5756
const REQUEST_TIMEOUT_SECS: u64 = 300;
5857
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";

src/storage/staging.rs

-16
Original file line numberDiff line numberDiff line change
@@ -136,22 +136,6 @@ impl StorageDir {
136136
paths
137137
}
138138

139-
#[allow(dead_code)]
140-
pub fn arrow_files_grouped_by_time(&self) -> HashMap<PathBuf, Vec<PathBuf>> {
141-
// hashmap <time, vec[paths]>
142-
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
143-
let arrow_files = self.arrow_files();
144-
for arrow_file_path in arrow_files {
145-
let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default());
146-
grouped_arrow_file
147-
.entry(key)
148-
.or_default()
149-
.push(arrow_file_path);
150-
}
151-
152-
grouped_arrow_file
153-
}
154-
155139
pub fn arrow_files_grouped_exclude_time(
156140
&self,
157141
exclude: NaiveDateTime,

src/utils/mod.rs

-7
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ use std::collections::HashMap;
3737
use std::env;
3838
use tracing::debug;
3939
use url::Url;
40-
#[allow(dead_code)]
41-
pub fn hostname() -> Option<String> {
42-
hostname::get()
43-
.ok()
44-
.and_then(|hostname| hostname.into_string().ok())
45-
}
4640

4741
pub fn hostname_unchecked() -> String {
4842
hostname::get().unwrap().into_string().unwrap()
@@ -95,7 +89,6 @@ pub struct TimePeriod {
9589
data_granularity: u32,
9690
}
9791

98-
#[allow(dead_code)]
9992
impl TimePeriod {
10093
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, data_granularity: u32) -> Self {
10194
Self {

0 commit comments

Comments
 (0)