@@ -33,6 +33,7 @@ use arrow_schema::{Field, Fields, Schema};
33
33
use chrono:: { NaiveDateTime , Timelike } ;
34
34
use derive_more:: { Deref , DerefMut } ;
35
35
use itertools:: Itertools ;
36
+ use once_cell:: sync:: Lazy ;
36
37
use parquet:: {
37
38
arrow:: ArrowWriter ,
38
39
basic:: Encoding ,
@@ -41,6 +42,7 @@ use parquet::{
41
42
schema:: types:: ColumnPath ,
42
43
} ;
43
44
use rand:: distributions:: DistString ;
45
+ use regex:: Regex ;
44
46
use relative_path:: RelativePathBuf ;
45
47
use tokio:: task:: JoinSet ;
46
48
use tracing:: { error, info, trace, warn} ;
@@ -68,6 +70,41 @@ use super::{
68
70
LogStream , ARROW_FILE_EXTENSION ,
69
71
} ;
70
72
73
+ /// Regex pattern for parsing arrow file names.
74
+ ///
75
+ /// # Format
76
+ /// The expected format is: `<schema_key>.<front_part>.data.arrows`
77
+ /// where:
78
+ /// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value
79
+ /// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2")
80
+ /// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition
81
+ /// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76")
82
+ ///
83
+ /// # Limitations
84
+ /// - Partition keys and values must only contain alphanumeric characters
85
+ /// - Special characters in partition values will cause the pattern to fail in capturing
86
+ ///
87
+ /// # Examples
88
+ /// Valid: "key1=value1,key2=value2"
89
+ /// Invalid: "key1=special!value,key2=special#value"
90
+ static ARROWS_NAME_STRUCTURE : Lazy < Regex > = Lazy :: new ( || {
91
+ Regex :: new ( r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.data\.arrows$" ) . expect ( "Validated regex" )
92
+ } ) ;
93
+
94
+ /// Returns the filename for parquet if provided arrows file path is valid as per our expectation
95
+ fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
96
+ let filename = path. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
97
+ let filename = ARROWS_NAME_STRUCTURE
98
+ . captures ( filename)
99
+ . and_then ( |c| c. get ( 1 ) ) ?
100
+ . as_str ( ) ;
101
+ let filename_with_random_number = format ! ( "{filename}.data.{random_string}.parquet" ) ;
102
+ let mut parquet_path = path. to_owned ( ) ;
103
+ parquet_path. set_file_name ( filename_with_random_number) ;
104
+
105
+ Some ( parquet_path)
106
+ }
107
+
71
108
#[ derive( Debug , thiserror:: Error ) ]
72
109
#[ error( "Stream not found: {0}" ) ]
73
110
pub struct StreamNotFound ( pub String ) ;
@@ -187,7 +224,11 @@ impl Stream {
187
224
let paths = dir
188
225
. flatten ( )
189
226
. map ( |file| file. path ( ) )
190
- . filter ( |file| file. extension ( ) . is_some_and ( |ext| ext. eq ( "arrows" ) ) )
227
+ . filter ( |path| {
228
+ path. file_name ( )
229
+ . and_then ( |f| f. to_str ( ) )
230
+ . is_some_and ( |file_name| ARROWS_NAME_STRUCTURE . is_match ( file_name) )
231
+ } )
191
232
. sorted_by_key ( |f| f. metadata ( ) . unwrap ( ) . modified ( ) . unwrap ( ) )
192
233
. collect ( ) ;
193
234
@@ -230,12 +271,13 @@ impl Stream {
230
271
& arrow_file_path, self . stream_name
231
272
) ;
232
273
remove_file ( & arrow_file_path) . unwrap ( ) ;
233
- } else {
234
- let key = Self :: arrow_path_to_parquet ( & arrow_file_path, & random_string) ;
274
+ } else if let Some ( key) = arrow_path_to_parquet ( & arrow_file_path, & random_string) {
235
275
grouped_arrow_file
236
276
. entry ( key)
237
277
. or_default ( )
238
278
. push ( arrow_file_path) ;
279
+ } else {
280
+ warn ! ( "Unexpected arrows file: {}" , arrow_file_path. display( ) ) ;
239
281
}
240
282
}
241
283
grouped_arrow_file
@@ -294,17 +336,6 @@ impl Stream {
294
336
}
295
337
}
296
338
297
- fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> PathBuf {
298
- let filename = path. file_stem ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
299
- let ( _, filename) = filename. split_once ( '.' ) . unwrap ( ) ;
300
- assert ! ( filename. contains( '.' ) , "contains the delim `.`" ) ;
301
- let filename_with_random_number = format ! ( "{filename}.{random_string}.arrows" ) ;
302
- let mut parquet_path = path. to_owned ( ) ;
303
- parquet_path. set_file_name ( filename_with_random_number) ;
304
- parquet_path. set_extension ( "parquet" ) ;
305
- parquet_path
306
- }
307
-
308
339
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
309
340
pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
310
341
info ! (
0 commit comments