Skip to content

Commit 5690615

Browse files
authored
feat: cache calc_row_counts (#1107)
## What changes are included in this PR? Caches row counts when encountering columns multiple times.
1 parent ac756a4 commit 5690615

File tree

1 file changed

+25
-12
lines changed

1 file changed

+25
-12
lines changed

crates/iceberg/src/expr/visitors/page_index_evaluator.rs

+25-12
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub(crate) struct PageIndexEvaluator<'a> {
6464
row_group_metadata: &'a RowGroupMetaData,
6565
iceberg_field_id_to_parquet_column_index: &'a HashMap<i32, usize>,
6666
snapshot_schema: &'a Schema,
67+
row_count_cache: HashMap<usize, Vec<usize>>,
6768
}
6869

6970
impl<'a> PageIndexEvaluator<'a> {
@@ -80,6 +81,7 @@ impl<'a> PageIndexEvaluator<'a> {
8081
row_group_metadata,
8182
iceberg_field_id_to_parquet_column_index: field_id_map,
8283
snapshot_schema,
84+
row_count_cache: HashMap::new(),
8385
}
8486
}
8587

@@ -126,7 +128,7 @@ impl<'a> PageIndexEvaluator<'a> {
126128
}
127129

128130
fn calc_row_selection<F>(
129-
&self,
131+
&mut self,
130132
field_id: i32,
131133
predicate: F,
132134
missing_col_behavior: MissingColBehavior,
@@ -168,17 +170,28 @@ impl<'a> PageIndexEvaluator<'a> {
168170
return self.select_all_rows();
169171
};
170172

171-
let Some(offset_index) = self.offset_index.get(parquet_column_index) else {
172-
// if we have a column index, we should always have an offset index.
173-
return Err(Error::new(
174-
ErrorKind::Unexpected,
175-
format!("Missing offset index for field id {}", field_id),
176-
));
177-
};
173+
let row_counts = {
174+
// Caches row count calculations for columns that appear multiple times in
175+
// the predicate
176+
match self.row_count_cache.get(&parquet_column_index) {
177+
Some(count) => count.clone(),
178+
None => {
179+
let Some(offset_index) = self.offset_index.get(parquet_column_index) else {
180+
// if we have a column index, we should always have an offset index.
181+
return Err(Error::new(
182+
ErrorKind::Unexpected,
183+
format!("Missing offset index for field id {}", field_id),
184+
));
185+
};
186+
187+
let count = self.calc_row_counts(offset_index);
188+
self.row_count_cache
189+
.insert(parquet_column_index, count.clone());
178190

179-
// TODO: cache row_counts to avoid recalcing if the same column
180-
// appears multiple times in the filter predicate
181-
let row_counts = self.calc_row_counts(offset_index);
191+
count
192+
}
193+
}
194+
};
182195

183196
let Some(page_filter) = Self::apply_predicate_to_column_index(
184197
predicate,
@@ -205,7 +218,7 @@ impl<'a> PageIndexEvaluator<'a> {
205218
Ok(row_selectors.into())
206219
}
207220

208-
/// returns a list of row counts per page
221+
/// Returns a list of row counts per page
209222
fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec<usize> {
210223
let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
211224
let mut row_counts = Vec::with_capacity(self.offset_index.len());

0 commit comments

Comments
 (0)