Skip to content

Commit 9862026

Browse files
authored
feat: Partition Binding and safe PartitionSpecBuilder (#491)
* Initial commit * Fixes * Replace UnboundPartitionSpec Builder * Fix tests, allow year, month day partition * Comments * typos * Fix UnboundBuild setting partition_id * Add test for unbound spec without partition ids * Fix into_unbound fn name * Split bound & unbound Partition builder, change add_partition_fields * Improve comment * Fix fmt * Review fixes * Remove partition_names() HashSet creation
1 parent 4434909 commit 9862026

File tree

9 files changed

+945
-173
lines changed

9 files changed

+945
-173
lines changed

crates/catalog/memory/src/catalog.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,8 @@ mod tests {
358358

359359
assert_eq!(metadata.current_schema().as_ref(), expected_schema);
360360

361-
let expected_partition_spec = PartitionSpec::builder()
361+
let expected_partition_spec = PartitionSpec::builder(expected_schema)
362362
.with_spec_id(0)
363-
.with_fields(vec![])
364363
.build()
365364
.unwrap();
366365

crates/catalog/rest/src/catalog.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1467,13 +1467,13 @@ mod tests {
14671467
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
14681468
.partition_spec(
14691469
UnboundPartitionSpec::builder()
1470-
.with_fields(vec![UnboundPartitionField::builder()
1470+
.add_partition_fields(vec![UnboundPartitionField::builder()
14711471
.source_id(1)
14721472
.transform(Transform::Truncate(3))
14731473
.name("id".to_string())
14741474
.build()])
1475-
.build()
1476-
.unwrap(),
1475+
.unwrap()
1476+
.build(),
14771477
)
14781478
.sort_order(
14791479
SortOrder::builder()

crates/iceberg/src/catalog/mod.rs

+9-25
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ pub struct TableCreation {
229229
/// The schema of the table.
230230
pub schema: Schema,
231231
/// The partition spec of the table, could be None.
232-
#[builder(default, setter(strip_option))]
232+
#[builder(default, setter(strip_option, into))]
233233
pub partition_spec: Option<UnboundPartitionSpec>,
234234
/// The sort order of the table.
235235
#[builder(default, setter(strip_option))]
@@ -476,7 +476,7 @@ mod tests {
476476
use crate::spec::{
477477
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
478478
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
479-
TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
479+
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
480480
};
481481
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};
482482

@@ -820,29 +820,13 @@ mod tests {
820820
"#,
821821
TableUpdate::AddSpec {
822822
spec: UnboundPartitionSpec::builder()
823-
.with_unbound_partition_field(
824-
UnboundPartitionField::builder()
825-
.source_id(4)
826-
.name("ts_day".to_string())
827-
.transform(Transform::Day)
828-
.build(),
829-
)
830-
.with_unbound_partition_field(
831-
UnboundPartitionField::builder()
832-
.source_id(1)
833-
.name("id_bucket".to_string())
834-
.transform(Transform::Bucket(16))
835-
.build(),
836-
)
837-
.with_unbound_partition_field(
838-
UnboundPartitionField::builder()
839-
.source_id(2)
840-
.name("id_truncate".to_string())
841-
.transform(Transform::Truncate(4))
842-
.build(),
843-
)
844-
.build()
845-
.unwrap(),
823+
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
824+
.unwrap()
825+
.add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
826+
.unwrap()
827+
.add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
828+
.unwrap()
829+
.build(),
846830
},
847831
);
848832
}

crates/iceberg/src/expr/visitors/expression_evaluator.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,9 @@ mod tests {
258258
UnaryExpression,
259259
};
260260
use crate::spec::{
261-
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField,
262-
PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
261+
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec,
262+
PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
263+
UnboundPartitionField,
263264
};
264265
use crate::Result;
265266

@@ -274,14 +275,15 @@ mod tests {
274275
))])
275276
.build()?;
276277

277-
let spec = PartitionSpec::builder()
278+
let spec = PartitionSpec::builder(&schema)
278279
.with_spec_id(1)
279-
.with_fields(vec![PartitionField::builder()
280+
.add_unbound_fields(vec![UnboundPartitionField::builder()
280281
.source_id(1)
281282
.name("a".to_string())
282-
.field_id(1)
283+
.partition_id(1)
283284
.transform(Transform::Identity)
284285
.build()])
286+
.unwrap()
285287
.build()
286288
.unwrap();
287289

@@ -298,7 +300,7 @@ mod tests {
298300
let partition_fields = partition_type.fields().to_owned();
299301

300302
let partition_schema = Schema::builder()
301-
.with_schema_id(partition_spec.spec_id)
303+
.with_schema_id(partition_spec.spec_id())
302304
.with_fields(partition_fields)
303305
.build()?;
304306

crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,8 @@ mod test {
495495
UnaryExpression,
496496
};
497497
use crate::spec::{
498-
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField,
499-
PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type,
498+
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec,
499+
PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
500500
};
501501

502502
const INT_MIN_VALUE: i32 = 30;
@@ -1656,14 +1656,15 @@ mod test {
16561656
.unwrap();
16571657
let table_schema_ref = Arc::new(table_schema);
16581658

1659-
let partition_spec = PartitionSpec::builder()
1659+
let partition_spec = PartitionSpec::builder(&table_schema_ref)
16601660
.with_spec_id(1)
1661-
.with_fields(vec![PartitionField::builder()
1661+
.add_unbound_fields(vec![UnboundPartitionField::builder()
16621662
.source_id(1)
16631663
.name("a".to_string())
1664-
.field_id(1)
1664+
.partition_id(1)
16651665
.transform(Transform::Identity)
16661666
.build()])
1667+
.unwrap()
16671668
.build()
16681669
.unwrap();
16691670
let partition_spec_ref = Arc::new(partition_spec);

crates/iceberg/src/expr/visitors/inclusive_projection.rs

+59-51
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl InclusiveProjection {
4040
fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec<PartitionField> {
4141
if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) {
4242
let mut parts: Vec<PartitionField> = vec![];
43-
for partition_spec_field in &self.partition_spec.fields {
43+
for partition_spec_field in self.partition_spec.fields() {
4444
if partition_spec_field.source_id == field_id {
4545
parts.push(partition_spec_field.clone())
4646
}
@@ -236,6 +236,7 @@ mod tests {
236236
use crate::expr::{Bind, Predicate, Reference};
237237
use crate::spec::{
238238
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
239+
UnboundPartitionField,
239240
};
240241

241242
fn build_test_schema() -> Schema {
@@ -265,9 +266,8 @@ mod tests {
265266
fn test_inclusive_projection_logic_ops() {
266267
let schema = build_test_schema();
267268

268-
let partition_spec = PartitionSpec::builder()
269+
let partition_spec = PartitionSpec::builder(&schema)
269270
.with_spec_id(1)
270-
.with_fields(vec![])
271271
.build()
272272
.unwrap();
273273

@@ -296,14 +296,17 @@ mod tests {
296296
fn test_inclusive_projection_identity_transform() {
297297
let schema = build_test_schema();
298298

299-
let partition_spec = PartitionSpec::builder()
299+
let partition_spec = PartitionSpec::builder(&schema)
300300
.with_spec_id(1)
301-
.with_fields(vec![PartitionField::builder()
302-
.source_id(1)
303-
.name("a".to_string())
304-
.field_id(1)
305-
.transform(Transform::Identity)
306-
.build()])
301+
.add_unbound_field(
302+
UnboundPartitionField::builder()
303+
.source_id(1)
304+
.name("a".to_string())
305+
.partition_id(1)
306+
.transform(Transform::Identity)
307+
.build(),
308+
)
309+
.unwrap()
307310
.build()
308311
.unwrap();
309312

@@ -330,30 +333,29 @@ mod tests {
330333
fn test_inclusive_projection_date_transforms() {
331334
let schema = build_test_schema();
332335

333-
let partition_spec = PartitionSpec::builder()
334-
.with_spec_id(1)
335-
.with_fields(vec![
336-
PartitionField::builder()
337-
.source_id(2)
338-
.name("year".to_string())
339-
.field_id(2)
340-
.transform(Transform::Year)
341-
.build(),
342-
PartitionField::builder()
343-
.source_id(2)
344-
.name("month".to_string())
345-
.field_id(2)
346-
.transform(Transform::Month)
347-
.build(),
348-
PartitionField::builder()
349-
.source_id(2)
350-
.name("day".to_string())
351-
.field_id(2)
352-
.transform(Transform::Day)
353-
.build(),
354-
])
355-
.build()
356-
.unwrap();
336+
let partition_spec = PartitionSpec {
337+
spec_id: 1,
338+
fields: vec![
339+
PartitionField {
340+
source_id: 2,
341+
name: "year".to_string(),
342+
field_id: 1000,
343+
transform: Transform::Year,
344+
},
345+
PartitionField {
346+
source_id: 2,
347+
name: "month".to_string(),
348+
field_id: 1001,
349+
transform: Transform::Month,
350+
},
351+
PartitionField {
352+
source_id: 2,
353+
name: "day".to_string(),
354+
field_id: 1002,
355+
transform: Transform::Day,
356+
},
357+
],
358+
};
357359

358360
let arc_schema = Arc::new(schema);
359361
let arc_partition_spec = Arc::new(partition_spec);
@@ -378,14 +380,17 @@ mod tests {
378380
fn test_inclusive_projection_truncate_transform() {
379381
let schema = build_test_schema();
380382

381-
let partition_spec = PartitionSpec::builder()
383+
let partition_spec = PartitionSpec::builder(&schema)
382384
.with_spec_id(1)
383-
.with_fields(vec![PartitionField::builder()
384-
.source_id(3)
385-
.name("name".to_string())
386-
.field_id(3)
387-
.transform(Transform::Truncate(4))
388-
.build()])
385+
.add_unbound_field(
386+
UnboundPartitionField::builder()
387+
.source_id(3)
388+
.name("name_truncate".to_string())
389+
.partition_id(3)
390+
.transform(Transform::Truncate(4))
391+
.build(),
392+
)
393+
.unwrap()
389394
.build()
390395
.unwrap();
391396

@@ -398,15 +403,15 @@ mod tests {
398403

399404
// applying InclusiveProjection to bound_predicate
400405
// should result in the 'name STARTS WITH "Testy McTest"'
401-
// predicate being transformed to 'name STARTS WITH "Test"',
406+
// predicate being transformed to 'name_truncate STARTS WITH "Test"',
402407
// since a `Truncate(4)` partition will map values of
403408
// name that start with "Testy McTest" into a partition
404409
// for values of name that start with the first four letters
405410
// of that, ie "Test".
406411
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
407412
let result = inclusive_projection.project(&bound_predicate).unwrap();
408413

409-
let expected = "name STARTS WITH \"Test\"".to_string();
414+
let expected = "name_truncate STARTS WITH \"Test\"".to_string();
410415

411416
assert_eq!(result.to_string(), expected)
412417
}
@@ -415,14 +420,17 @@ mod tests {
415420
fn test_inclusive_projection_bucket_transform() {
416421
let schema = build_test_schema();
417422

418-
let partition_spec = PartitionSpec::builder()
423+
let partition_spec = PartitionSpec::builder(&schema)
419424
.with_spec_id(1)
420-
.with_fields(vec![PartitionField::builder()
421-
.source_id(1)
422-
.name("a".to_string())
423-
.field_id(1)
424-
.transform(Transform::Bucket(7))
425-
.build()])
425+
.add_unbound_field(
426+
UnboundPartitionField::builder()
427+
.source_id(1)
428+
.name("a_bucket[7]".to_string())
429+
.partition_id(1)
430+
.transform(Transform::Bucket(7))
431+
.build(),
432+
)
433+
.unwrap()
426434
.build()
427435
.unwrap();
428436

@@ -440,7 +448,7 @@ mod tests {
440448
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
441449
let result = inclusive_projection.project(&bound_predicate).unwrap();
442450

443-
let expected = "a = 2".to_string();
451+
let expected = "a_bucket[7] = 2".to_string();
444452

445453
assert_eq!(result.to_string(), expected)
446454
}

crates/iceberg/src/spec/manifest.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,14 @@ impl ManifestWriter {
227227
)?;
228228
avro_writer.add_user_metadata(
229229
"partition-spec".to_string(),
230-
to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
230+
to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| {
231231
Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
232232
.with_source(err)
233233
})?,
234234
)?;
235235
avro_writer.add_user_metadata(
236236
"partition-spec-id".to_string(),
237-
manifest.metadata.partition_spec.spec_id.to_string(),
237+
manifest.metadata.partition_spec.spec_id().to_string(),
238238
)?;
239239
avro_writer.add_user_metadata(
240240
"format-version".to_string(),
@@ -300,12 +300,12 @@ impl ManifestWriter {
300300
self.output.write(Bytes::from(content)).await?;
301301

302302
let partition_summary =
303-
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
303+
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());
304304

305305
Ok(ManifestFile {
306306
manifest_path: self.output.location().to_string(),
307307
manifest_length: length as i64,
308-
partition_spec_id: manifest.metadata.partition_spec.spec_id,
308+
partition_spec_id: manifest.metadata.partition_spec.spec_id(),
309309
content: manifest.metadata.content,
310310
// sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
311311
// real sequence number in `ManifestListWriter`.

0 commit comments

Comments
 (0)