Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ config_namespace! {
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true

/// When set to true, use grace hash join operator instead of hash joins.
/// Grace hash join operator which repartitions both inputs to disk before performing the join.
/// This trades additional IO for predictable memory usage on very large joins.
pub enable_grace_hash_join: bool, default = false

/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering If the flag is not enabled,
/// the SymmetricHashJoin operator will be unable to prune its internal buffers,
Expand Down
11 changes: 11 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ impl SessionConfig {
self.options.optimizer.repartition_joins
}

/// Should spillable hash joins be executed via the Grace hash join operator?
pub fn enable_grace_hash_join(&self) -> bool {
self.options.optimizer.enable_grace_hash_join
}

/// Are aggregates repartitioned during execution?
pub fn repartition_aggregations(&self) -> bool {
self.options.optimizer.repartition_aggregations
Expand Down Expand Up @@ -298,6 +303,12 @@ impl SessionConfig {
self
}

/// Enables or disables the Grace hash join operator for spillable hash joins
pub fn with_enable_grace_hash_join(mut self, enabled: bool) -> Self {
self.options_mut().optimizer.enable_grace_hash_join = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.options_mut().optimizer.repartition_aggregations = enabled;
Expand Down
158 changes: 116 additions & 42 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ use crate::PhysicalOptimizerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, JoinSide, JoinType};
use datafusion_common::{DataFusionError, JoinSide, JoinType, internal_err};
use datafusion_expr_common::sort_properties::SortProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::{
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
StreamJoinPartitionMode, SymmetricHashJoinExec,
CrossJoinExec, HashJoinExec, GraceHashJoinExec, NestedLoopJoinExec, PartitionMode,
StreamJoinPartitionMode, SymmetricHashJoinExec
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;
Expand Down Expand Up @@ -134,12 +134,14 @@ impl PhysicalOptimizerRule for JoinSelection {
let config = &config.optimizer;
let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
let enable_grace_hash_join = config.enable_grace_hash_join;
new_plan
.transform_up(|plan| {
statistical_join_selection_subrule(
plan,
collect_threshold_byte_size,
collect_threshold_num_rows,
enable_grace_hash_join,
)
})
.data()
Expand Down Expand Up @@ -187,33 +189,23 @@ pub(crate) fn try_collect_left(
if hash_join.join_type().supports_swap()
&& should_swap_join_order(&**left, &**right)?
{
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
match hash_join.swap_inputs(PartitionMode::CollectLeft) {
Ok(plan) => Ok(Some(plan)),
Err(err) if is_missing_join_columns(&err) => Ok(None),
Err(err) => Err(err),
}
} else {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equality(),
)?)))
build_collect_left_exec(hash_join, left, right)
}
}
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equality(),
)?))),
(true, false) => build_collect_left_exec(hash_join, left, right),
(false, true) => {
if hash_join.join_type().supports_swap() {
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
match hash_join.swap_inputs(PartitionMode::CollectLeft) {
Ok(plan) => Ok(Some(plan)),
Err(err) if is_missing_join_columns(&err) => Ok(None),
Err(err) => Err(err),
}
} else {
Ok(None)
}
Expand All @@ -222,18 +214,67 @@ pub(crate) fn try_collect_left(
}
}


fn is_missing_join_columns(err: &DataFusionError) -> bool {
matches!(
err,
DataFusionError::Plan(msg)
if msg.contains("The left or right side of the join does not have all columns")
)
}

fn build_collect_left_exec(
hash_join: &HashJoinExec,
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
match HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equality(),
) {
Ok(exec) => Ok(Some(Arc::new(exec))),
Err(err) if is_missing_join_columns(&err) => Ok(None),
Err(err) => Err(err),
}
}

/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
///
/// Checks if the join order should be swapped based on the join type and input statistics.
/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
/// creates a standard partitioned hash join.
pub(crate) fn partitioned_hash_join(
hash_join: &HashJoinExec,
enable_grace: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let left = hash_join.left();
let right = hash_join.right();
if hash_join.join_type().supports_swap() && should_swap_join_order(&**left, &**right)?
{
let should_swap = hash_join.join_type().supports_swap()
&& should_swap_join_order(&**left, &**right)?;
if enable_grace {
let grace = Arc::new(GraceHashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
hash_join.null_equality(),
)?);
return if should_swap {
grace.swap_inputs(PartitionMode::Partitioned)
} else {
Ok(grace)
};
}

if should_swap {
hash_join.swap_inputs(PartitionMode::Partitioned)
} else {
Ok(Arc::new(HashJoinExec::try_new(
Expand All @@ -255,25 +296,58 @@ fn statistical_join_selection_subrule(
plan: Arc<dyn ExecutionPlan>,
collect_threshold_byte_size: usize,
collect_threshold_num_rows: usize,
enable_grace_hash_join: bool,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let transformed =
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
match hash_join.partition_mode() {
PartitionMode::Auto => try_collect_left(
hash_join,
false,
collect_threshold_byte_size,
collect_threshold_num_rows,
)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::Auto => {
if enable_grace_hash_join
{
Some(partitioned_hash_join(
hash_join,
enable_grace_hash_join,
)?)
} else {
try_collect_left(
hash_join,
false,
collect_threshold_byte_size,
collect_threshold_num_rows,
)?
.map_or_else(
|| {
partitioned_hash_join(
hash_join,
enable_grace_hash_join,
)
.map(Some)
},
|v| Ok(Some(v)),
)?
}
}
PartitionMode::CollectLeft => {
if enable_grace_hash_join
{
Some(partitioned_hash_join(
hash_join,
enable_grace_hash_join,
)?)
} else {
try_collect_left(hash_join, true, 0, 0)?
.map_or_else(
|| {
partitioned_hash_join(
hash_join,
enable_grace_hash_join,
)
.map(Some)
},
|v| Ok(Some(v)),
)?
}
}
PartitionMode::Partitioned => {
let left = hash_join.left();
let right = hash_join.right();
Expand Down
Loading
Loading