Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0cac457
Initial test worked
Vedin Oct 8, 2025
de04ffd
Temporary unknown partitioning to make it work
Vedin Oct 10, 2025
9d8d3b3
Add multi-part spill reload
Vedin Oct 14, 2025
622fbc1
Some weird AI fixes. Better revert to proceed
Vedin Oct 16, 2025
fcb7da6
Basic memory + disk spilling
osipovartem Oct 27, 2025
e9441e2
Basic memory + disk spilling
osipovartem Oct 27, 2025
c1d66ff
Basic memory + disk spilling
osipovartem Oct 27, 2025
7833aab
Inner Join works
Vedin Nov 3, 2025
c5949e3
Fix RightSemi Join
Vedin Nov 3, 2025
ca73155
Fix LeftAnti
Vedin Nov 3, 2025
5712df8
DRAFT GraceHashJoin with disk spilling
osipovartem Nov 3, 2025
aa4b439
Comment out printlns
Vedin Nov 4, 2025
07dc4d0
Comment2 remove
Vedin Nov 4, 2025
c083282
Spill less in case we fit in memory
Vedin Nov 5, 2025
072f56a
tracking spilled bytes
Vedin Nov 5, 2025
0932d54
Upd
osipovartem Nov 6, 2025
c87377d
Spilled probe path
Vedin Nov 6, 2025
ddc6a34
scheduler init
Vedin Nov 7, 2025
2e85309
upd
osipovartem Nov 7, 2025
c78d2e9
Partly working scheduler (incorrect results)
Vedin Nov 8, 2025
6d14d4b
Fix duplicating rows with reseting a state
Vedin Nov 9, 2025
4b0a66b
Pass projection and filter
osipovartem Nov 10, 2025
80f2c34
Spill manager support for sharing spill file handle
Vedin Nov 10, 2025
d36af9d
Add spilltodisk collectMode
Vedin Nov 10, 2025
d0a48e0
Try fixing freezzed state
Vedin Nov 10, 2025
505b1cb
Add flag for scheduler
Vedin Nov 11, 2025
3b438f8
Merge branch 'aosipov/grace_hash_join' into hash-join-spillable-with-…
Vedin Nov 11, 2025
fe8e494
Add instrumentation to support grace hash join
Vedin Nov 12, 2025
51654a4
Revert "Add flag for scheduler"
Vedin Nov 12, 2025
dc461ca
Revert "Try fixing freezzed state"
Vedin Nov 12, 2025
3bc28f0
Revert "Add spilltodisk collectMode"
Vedin Nov 12, 2025
c333d2d
Revert "Spill manager support for sharing spill file handle"
Vedin Nov 12, 2025
8a46c67
Fix RightSemi joins
Vedin Nov 13, 2025
293a933
Cleanup
Vedin Nov 13, 2025
50d36b3
Remove HHJ debug
Vedin Nov 13, 2025
29ecfa2
Cleanup helpers and commented code
Vedin Nov 13, 2025
71a51cb
Redundant structures cleanup
Vedin Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,16 @@ config_namespace! {
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true

/// Should DataFusion use spillable partitioned hash joins instead of regular partitioned joins
/// when repartitioning is enabled. This allows handling larger datasets by spilling to disk
/// when memory pressure occurs during join execution.
pub enable_spillable_hash_join: bool, default = true

/// When set to true, spillable partitioned hash joins will be replaced with the experimental
/// Grace hash join operator which repartitions both inputs to disk before performing the join.
/// This trades additional IO for predictable memory usage on very large joins.
pub enable_grace_hash_join: bool, default = false

/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering If the flag is not enabled,
/// the SymmetricHashJoin operator will be unable to prune its internal buffers,
Expand Down
74 changes: 73 additions & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::utils::JoinFilter;
use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, PartitionMode};
use datafusion_physical_plan::joins::{
GraceHashJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::{
Expand Down Expand Up @@ -266,6 +268,76 @@ async fn test_join_with_swap() {
);
}

#[tokio::test]
async fn test_grace_hash_join_enabled() {
let (big, small) = create_big_and_small();
let join = Arc::new(
HashJoinExec::try_new(
Arc::clone(&small),
Arc::clone(&big),
vec![(
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
)],
None,
&JoinType::Inner,
None,
PartitionMode::Auto,
NullEquality::NullEqualsNothing,
)
.unwrap(),
);

let mut config = ConfigOptions::new();
config.optimizer.enable_grace_hash_join = true;
config.optimizer.enable_spillable_hash_join = true;
config.optimizer.hash_join_single_partition_threshold = 1;
config.optimizer.hash_join_single_partition_threshold_rows = 1;

let optimized = JoinSelection::new().optimize(join, &config).unwrap();
assert!(
optimized.as_any().is::<GraceHashJoinExec>(),
"expected GraceHashJoinExec when grace hash join is enabled"
);
}

#[tokio::test]
async fn test_grace_hash_join_disabled() {
let (big, small) = create_big_and_small();
let join = Arc::new(
HashJoinExec::try_new(
Arc::clone(&small),
Arc::clone(&big),
vec![(
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
)],
None,
&JoinType::Inner,
None,
PartitionMode::Auto,
NullEquality::NullEqualsNothing,
)
.unwrap(),
);

let mut config = ConfigOptions::new();
config.optimizer.enable_grace_hash_join = false;
config.optimizer.enable_spillable_hash_join = true;
config.optimizer.hash_join_single_partition_threshold = 1;
config.optimizer.hash_join_single_partition_threshold_rows = 1;

let optimized = JoinSelection::new().optimize(join, &config).unwrap();
let hash_join = optimized
.as_any()
.downcast_ref::<HashJoinExec>()
.expect("Grace disabled should keep HashJoinExec");
assert_eq!(
hash_join.partition_mode(),
&PartitionMode::PartitionedSpillable
);
}

#[tokio::test]
async fn test_left_join_no_swap() {
let (big, small) = create_big_and_small();
Expand Down
22 changes: 22 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ impl SessionConfig {
self.options.optimizer.repartition_joins
}

/// Are spillable partitioned hash joins enabled?
pub fn enable_spillable_hash_join(&self) -> bool {
self.options.optimizer.enable_spillable_hash_join
}

/// Should spillable hash joins be executed via the Grace hash join operator?
pub fn enable_grace_hash_join(&self) -> bool {
self.options.optimizer.enable_grace_hash_join
}

/// Are aggregates repartitioned during execution?
pub fn repartition_aggregations(&self) -> bool {
self.options.optimizer.repartition_aggregations
Expand Down Expand Up @@ -298,6 +308,18 @@ impl SessionConfig {
self
}

/// Enables or disables spillable partitioned hash joins for handling larger datasets
pub fn with_enable_spillable_hash_join(mut self, enabled: bool) -> Self {
self.options_mut().optimizer.enable_spillable_hash_join = enabled;
self
}

/// Enables or disables the Grace hash join operator for spillable hash joins
pub fn with_enable_grace_hash_join(mut self, enabled: bool) -> Self {
self.options_mut().optimizer.enable_grace_hash_join = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.options_mut().optimizer.repartition_aggregations = enabled;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rand::{rng, Rng};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};
use tempfile::{Builder, NamedTempFile, TempDir, TempPath};

use crate::memory_pool::human_readable_size;

Expand Down Expand Up @@ -370,6 +370,17 @@ impl RefCountedTempFile {
pub fn current_disk_usage(&self) -> u64 {
self.current_file_disk_usage
}

pub fn clone_refcounted(&self) -> Result<Self> {
let reopened = std::fs::File::open(self.path())?;
let temp_path = TempPath::from_path(self.path());
Ok(Self {
_parent_temp_dir: Arc::clone(&self._parent_temp_dir),
tempfile: NamedTempFile::from_parts(reopened, temp_path),
current_file_disk_usage: self.current_file_disk_usage,
disk_manager: Arc::clone(&self.disk_manager),
})
}
}

/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, ExecutionPlan,
coalesce_batches::CoalesceBatchesExec,
filter::FilterExec,
joins::{GraceHashJoinExec, HashJoinExec},
repartition::RepartitionExec,
ExecutionPlan,
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand Down Expand Up @@ -62,6 +65,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<GraceHashJoinExec>().is_some()
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
Expand Down
76 changes: 75 additions & 1 deletion datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_physical_plan::aggregates::{
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::joins::{
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
CrossJoinExec, GraceHashJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -348,7 +348,57 @@ pub fn adjust_input_keys_ordering(
// Can not satisfy, clear the current requirements and generate new empty requirements
requirements.data.clear();
}
PartitionMode::PartitionedSpillable => {
// For partitioned spillable, use the same logic as regular partitioned
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
new_conditions.0,
filter.clone(),
join_type,
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
projection.clone(),
PartitionMode::PartitionedSpillable,
*null_equality,
)
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
on,
&[],
&join_constructor,
)
.map(Transformed::yes);
}
}
} else if let Some(grace_join) = plan.as_any().downcast_ref::<GraceHashJoinExec>() {
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
GraceHashJoinExec::try_new(
Arc::clone(grace_join.left()),
Arc::clone(grace_join.right()),
new_conditions.0,
grace_join.filter().cloned(),
grace_join.join_type(),
grace_join.projection.clone(),
grace_join.null_equality(),
)
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
grace_join.on(),
&[],
&join_constructor,
)
.map(Transformed::yes);
} else if let Some(CrossJoinExec { left, .. }) =
plan.as_any().downcast_ref::<CrossJoinExec>()
{
Expand Down Expand Up @@ -656,6 +706,30 @@ pub fn reorder_join_keys_to_inputs(
)?));
}
}
} else if let Some(grace_join) = plan_any.downcast_ref::<GraceHashJoinExec>() {
let (join_keys, positions) = reorder_current_join_keys(
extract_join_keys(grace_join.on()),
Some(grace_join.left().output_partitioning()),
Some(grace_join.right().output_partitioning()),
grace_join.left().equivalence_properties(),
grace_join.right().equivalence_properties(),
);
if positions.is_some_and(|idxs| !idxs.is_empty()) {
let JoinKeyPairs {
left_keys,
right_keys,
} = join_keys;
let new_join_on = new_join_conditions(&left_keys, &right_keys);
return Ok(Arc::new(GraceHashJoinExec::try_new(
Arc::clone(grace_join.left()),
Arc::clone(grace_join.right()),
new_join_on,
grace_join.filter().cloned(),
grace_join.join_type(),
grace_join.projection.clone(),
grace_join.null_equality(),
)?));
}
} else if let Some(SortMergeJoinExec {
left,
right,
Expand Down
Loading
Loading