diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 7f13418d26f7..774e553c84b7 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -95,12 +95,17 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result { + pub fn try_new( + partitioning: Partitioning, + partition: usize, + num_input_partitions: usize, + timer: metrics::Time, + ) -> Result { let state = match partitioning { Partitioning::RoundRobinBatch(num_partitions) => { BatchPartitionerState::RoundRobin { num_partitions, - next_idx: 0, + next_idx: (num_partitions / num_input_partitions) * partition, } } Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { @@ -492,8 +497,12 @@ impl RepartitionExec { r_metrics: RepartitionMetrics, context: Arc, ) -> Result<()> { - let mut partitioner = - BatchPartitioner::try_new(partitioning, r_metrics.repart_time.clone())?; + let mut partitioner = BatchPartitioner::try_new( + partitioning, + i, + input.output_partitioning().partition_count(), + r_metrics.repart_time.clone(), + )?; // execute the child operator let timer = r_metrics.fetch_time.timer(); diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs b/datafusion/core/src/scheduler/pipeline/repartition.rs index 7eeb3c31de10..2f2593639a86 100644 --- a/datafusion/core/src/scheduler/pipeline/repartition.rs +++ b/datafusion/core/src/scheduler/pipeline/repartition.rs @@ -43,7 +43,8 @@ impl RepartitionPipeline { assert_ne!(output_count, 0); // TODO: metrics support - let partitioner = BatchPartitioner::try_new(output, Default::default())?; + let partitioner = + BatchPartitioner::try_new(output, 0, output_count, Default::default())?; let state = Mutex::new(RepartitionState { partitioner,