@@ -6,7 +6,6 @@ use crate::physical_planner::joins::utils::{
6
6
use crate :: session_context:: Algorithm ;
7
7
use ahash:: RandomState ;
8
8
use bio:: data_structures:: interval_tree as rust_bio;
9
- use coitrees:: { COITree , Interval } ;
10
9
use datafusion:: arrow:: array:: { Array , AsArray , PrimitiveArray , PrimitiveBuilder , RecordBatch } ;
11
10
use datafusion:: arrow:: compute;
12
11
use datafusion:: arrow:: datatypes:: { DataType , Schema , SchemaRef , UInt32Type } ;
@@ -40,6 +39,9 @@ use std::mem::size_of;
40
39
use std:: sync:: Arc ;
41
40
use std:: task:: Poll ;
42
41
42
+ // Max number of records in left side is 18,446,744,073,709,551,615 (usize::MAX on 64 bit)
43
+ // We can switch to u32::MAX which is 4,294,967,295
44
+ // which consumes ~30% less memory when building COITrees but limits the number of elements.
43
45
type Position = usize ;
44
46
45
47
#[ derive( Debug ) ]
@@ -610,7 +612,7 @@ async fn collect_left_input(
610
612
acc. 0 . push ( batch) ;
611
613
Ok ( acc)
612
614
} )
613
- . await ?; // 11.5mb 3250 al 768 tal
615
+ . await ?;
614
616
615
617
// Estimation of memory size, required for hashtable, prior to allocation.
616
618
// Final result can be verified using `RawTable.allocation_info()`
@@ -641,7 +643,7 @@ async fn collect_left_input(
641
643
// build a left hash map
642
644
hashes_buffer. clear ( ) ;
643
645
hashes_buffer. resize ( batch. num_rows ( ) , 0 ) ;
644
- update_hashmap ( // 760 al, 370 tal -> 462 al, 130 al
646
+ update_hashmap (
645
647
& on_left,
646
648
& left_interval,
647
649
batch,
@@ -653,9 +655,9 @@ async fn collect_left_input(
653
655
offset += batch. num_rows ( ) ;
654
656
}
655
657
656
- let hashmap = IntervalJoinAlgorithm :: new ( & algorithm, hashmap) ; // 14mb, 145 al
658
+ let hashmap = IntervalJoinAlgorithm :: new ( & algorithm, hashmap) ;
657
659
658
- let single_batch = compute:: concat_batches ( & schema, & batches) ?; // 10.7 mb, 356 al
660
+ let single_batch = compute:: concat_batches ( & schema, & batches) ?;
659
661
let data = JoinLeftData :: new ( hashmap, single_batch, reservation) ;
660
662
661
663
Ok ( data)
@@ -703,7 +705,15 @@ enum IntervalJoinAlgorithm {
703
705
ArrayIntervalTree ( FnvHashMap < u64 , rust_bio:: ArrayBackedIntervalTree < i32 , Position > > ) ,
704
706
AIList ( FnvHashMap < u64 , scailist:: ScAIList < Position > > ) ,
705
707
Lapper ( FnvHashMap < u64 , rust_lapper:: Lapper < u32 , Position > > ) ,
706
- CoitresNearest ( FnvHashMap < u64 , ( COITree < Position , u32 > , Vec < Interval < Position > > ) > ) ,
708
+ CoitresNearest (
709
+ FnvHashMap <
710
+ u64 ,
711
+ (
712
+ coitrees:: COITree < Position , u32 > ,
713
+ Vec < coitrees:: Interval < Position > > ,
714
+ ) ,
715
+ > ,
716
+ ) ,
707
717
}
708
718
709
719
impl Debug for IntervalJoinAlgorithm {
@@ -868,7 +878,12 @@ impl IntervalJoinAlgorithm {
868
878
* node. metadata
869
879
}
870
880
871
- fn nearest ( & self , start : i32 , end : i32 , ranges2 : & [ Interval < Position > ] ) -> Option < Position > {
881
+ fn nearest (
882
+ & self ,
883
+ start : i32 ,
884
+ end : i32 ,
885
+ ranges2 : & [ coitrees:: Interval < Position > ] ,
886
+ ) -> Option < Position > {
872
887
if ranges2. is_empty ( ) {
873
888
return None ;
874
889
}
@@ -995,14 +1010,13 @@ fn update_hashmap(
995
1010
let start = evaluate_as_i32 ( left_interval. start ( ) , batch) ?;
996
1011
let end = evaluate_as_i32 ( left_interval. end ( ) , batch) ?;
997
1012
998
- hash_values
999
- . iter ( )
1000
- . enumerate ( )
1001
- . for_each ( |( i, hash_val) | {
1002
- let position = i + offset;
1003
- let intervals: & mut Vec < SequilaInterval > = hash_map. entry ( * hash_val) . or_insert_with ( || Vec :: with_capacity ( 4096 ) ) ;
1004
- intervals. push ( SequilaInterval :: new ( start. value ( i) , end. value ( i) , position) )
1005
- } ) ;
1013
+ hash_values. iter ( ) . enumerate ( ) . for_each ( |( i, hash_val) | {
1014
+ let position: Position = i + offset;
1015
+ let intervals: & mut Vec < SequilaInterval > = hash_map
1016
+ . entry ( * hash_val)
1017
+ . or_insert_with ( || Vec :: with_capacity ( 4096 ) ) ;
1018
+ intervals. push ( SequilaInterval :: new ( start. value ( i) , end. value ( i) , position) )
1019
+ } ) ;
1006
1020
1007
1021
Ok ( ( ) )
1008
1022
}
0 commit comments