[SPARK-54593][SQL] Enable DPP for materialized filtering sides#56071
[SPARK-54593][SQL] Enable DPP for materialized filtering sides#56071sunchao wants to merge 2 commits into
Conversation
MaxGekk
left a comment
There was a problem hiding this comment.
1 blocking, 2 non-blocking.
One design gap (InMemoryRelation exclusion); two comment-only fixes.
Design / architecture (1)
- PartitionPruning.scala:208:
InMemoryRelationexcluded without explanation — see inline
Correctness (2)
- PartitionPruning.scala:221: "with known statistics" qualifier not enforced — see inline
- PartitionPruning.scala:203: "carrying source statistics" mischaracterizes the criterion — see inline
Verification
Traced the eligibility gate change: extending hasSelectivePredicateOrMaterializedInput to LocalRelation and LogicalRDD(isCheckpointedInput) is safe. The prune() method's transformUp has no double-substitution risk (the inserted Filter is not re-visited). The existing safety gates — onlyInBroadcast fallback to TrueLiteral and canReuseExchange broadcast-match in PlanDynamicPruningFilters — are unchanged; with the default REUSE_BROADCAST_ONLY=true, a non-broadcast input degrades to no pruning, which is semantically identical to the original plan.
| * as a broadcast build side for DPP without evaluating an upstream computation again. | ||
| */ | ||
| private def hasSelectivePredicate(plan: LogicalPlan): Boolean = { | ||
| private def hasSelectivePredicateOrMaterializedInput(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
InMemoryRelation (the result of df.cache() / df.persist()) is also a materialized input that does not require upstream recomputation, yet it is excluded from this check. The same file already treats InMemoryRelation as zero-overhead in calculatePlanOverhead, so it is acknowledged as already-materialized for cost purposes.
In practice: sampledKeys.cache() followed by a partitioned join does NOT get DPP without a filter; sampledKeys.localCheckpoint() in the same pattern DOES — despite both being materialized. Is this intentional?
If so, the Scaladoc should document why (one plausible rationale: InMemoryRelation can be evicted under memory pressure and re-executed, while a checkpoint is immutable for the query's lifetime). If not, adding case _: InMemoryRelation => true would make the behavior consistent.
| * meet the following requirements: | ||
| * (1) it can not be a stream | ||
| * (2) it needs to contain a selective predicate used for filtering | ||
| * (2) it needs to contain a selective predicate or a materialized input with known statistics |
There was a problem hiding this comment.
"with known statistics" is not enforced by the code — the actual gate is isCheckpointedInput, a boolean flag that is independent of statistics availability. A checkpointed LogicalRDD with originStats = None still qualifies (falling back to defaultSizeInBytes in computeStats()). The qualifier is misleading.
| * (2) it needs to contain a selective predicate or a materialized input with known statistics | |
| * (2) it needs to contain a selective predicate or a materialized input |
|
|
||
| /** | ||
| * Search a filtering predicate in a given logical plan | ||
| * Search for a selective filtering operation or a precomputed input carrying source statistics. |
There was a problem hiding this comment.
"carrying source statistics" mischaracterizes the criterion. The function checks materialization (LocalRelation holds in-memory rows; LogicalRDD.isCheckpointedInput flags a checkpoint), not statistics quality. The second sentence already captures the real property correctly: "without evaluating an upstream computation again."
| * Search for a selective filtering operation or a precomputed input carrying source statistics. | |
| * Search for a selective filtering operation or a precomputed, materialized input. |
290e23d to
ad9d6e9
Compare
MaxGekk
left a comment
There was a problem hiding this comment.
3 addressed, 0 remaining, 0 new.
All three prior-round comment findings on PartitionPruning.scala are resolved: the InMemoryRelation exclusion is now documented (eviction -> upstream recomputation), and the two mischaracterizing doc phrases ("with known statistics", "carrying source statistics") were rewritten to describe materialization rather than statistics.
Verification
Traced the widened eligibility gate. The newly eligible inputs -- LocalRelation and checkpoint-derived LogicalRDD (isCheckpointedInput) -- are deterministic materialized snapshots, so the DPP semi-join filter is redundant with the join's own key match and produces identical results. prune()'s transformUp is unchanged (no double-substitution; the inserted Filter is not re-matched). The REUSE_BROADCAST_ONLY=true default still degrades a non-broadcast filtering side to no pruning (PartitionPruning.scala:123). The fromCheckpoint marker is propagated across every copy path (newInstance/withStream/otherCopyArgs/fromDataset), set true only in Dataset.checkpoint, and explicitly false in UnionLoopExec, so recursive-CTE and arbitrary LogicalRDDs stay ineligible.
|
+1, LGTM. Merging to master/4.x. |
### What changes were proposed in this pull request? This PR extends dynamic partition pruning (DPP) eligibility for small, already materialized filtering sides: - `LocalRelation`, which represents locally available rows. - `LogicalRDD` produced by `checkpoint()` or `localCheckpoint()`. Checkpoint-created `LogicalRDD`s carry an explicit marker so that DPP is not enabled for arbitrary `LogicalRDD` inputs that may require recomputing an upstream query. This also keeps recursive CTE and `foreachBatch`-constructed inputs outside the new eligibility rule. This supersedes the unmerged approach in #53324 with narrower `LogicalRDD` handling while addressing SPARK-54593. ### Why are the changes needed? DPP currently requires a filtering predicate in the build-side logical plan. When a small filtering side is already materialized as a `LocalRelation` or a checkpointed `LogicalRDD`, that predicate is no longer present, so Spark misses partition pruning opportunities. This occurs for joins where a partition expression is matched to a small set of keys, for example `concat_ws("||", hour, category) = hc_key`. Although the expression is composed only from partition columns, the partitioned scan is not dynamically pruned when the filtering side is materialized. ### Does this PR introduce _any_ user-facing change? Yes. Queries joining a partitioned file-source table with a small `LocalRelation` or checkpointed filtering side may now perform dynamic partition pruning and scan fewer partitions. There is no API change. ### How was this patch tested? - Added positive coverage for DPP using a `LocalRelation` build side with an expression over partition columns. - Added positive coverage for DPP using a `localCheckpoint()` build side with the same expression form. - Added negative coverage confirming that a non-checkpointed `LogicalRDD` does not become DPP-eligible. - Ran `build/sbt 'sql/testOnly org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOn org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOff'`. - Ran `build/sbt sql/scalastyle sql/Test/scalastyle`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: OpenAI Codex Closes #56071 from sunchao/dev/chao/codex/dpp-materialized-build-side. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit 0255283) Signed-off-by: Max Gekk <max.gekk@gmail.com>
|
Hi @sunchao and @MaxGekk , thanks for getting this merged. A note on attribution. The core move in this PR, making hasSelectivePredicate return true for an already-materialized leaf that has no Filter, was first introduced in #53263 (SPARK-54554) on Nov 29, 2025 (CommandResult), and extended to LocalRelation and LogicalRDD in #53324 (SPARK-54593). This PR references #53324 but not #53263, which is the earlier PR where the mechanism and the materialized-input rationale originated. Two small asks. First, could #53263 / SPARK-54554 be referenced alongside #53324 so the lineage is on the record? Second, given the merged work generalizes the approach from those PRs, would a co-author acknowledgment for #53263 (co-authored with @mc8max and @dwsmith1983) be appropriate, whether on the Jira or in a follow-up note? I understand the commit itself is already in history; even a reference on the ticket would be great. Thanks for considering it. Prior PRs:
Jira: |
|
Thanks @MaxGekk for reviewing and merging this!! @dwsmith1983 sure! I've updated the PR description to include #53263 . Given the PR is already committed, it's hard to update and add extra co-authors to it, but I'll add both of you if there are follow-up PRs to this, which is very likely. |
What changes were proposed in this pull request?
This PR extends dynamic partition pruning (DPP) eligibility for small, already
materialized filtering sides:
LocalRelation, which represents locally available rows.LogicalRDDproduced bycheckpoint()orlocalCheckpoint().Checkpoint-created
LogicalRDDs carry an explicit marker so that DPP is notenabled for arbitrary
LogicalRDDinputs that may require recomputing anupstream query. This also keeps recursive CTE and
foreachBatch-constructedinputs outside the new eligibility rule.
This supersedes the unmerged approach in #53324 as well as #53263 with narrower
LogicalRDDhandling while addressing SPARK-54593.
Why are the changes needed?
DPP currently requires a filtering predicate in the build-side logical plan.
When a small filtering side is already materialized as a
LocalRelationor acheckpointed
LogicalRDD, that predicate is no longer present, so Spark missespartition pruning opportunities.
This occurs for joins where a partition expression is matched to a small set of
keys, for example
concat_ws("||", hour, category) = hc_key. Although theexpression is composed only from partition columns, the partitioned scan is not
dynamically pruned when the filtering side is materialized.
Does this PR introduce any user-facing change?
Yes. Queries joining a partitioned file-source table with a small
LocalRelationor checkpointed filtering side may now perform dynamicpartition pruning and scan fewer partitions. There is no API change.
How was this patch tested?
LocalRelationbuild side with anexpression over partition columns.
localCheckpoint()build side withthe same expression form.
LogicalRDDdoesnot become DPP-eligible.
build/sbt 'sql/testOnly org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOn org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOff'.build/sbt sql/scalastyle sql/Test/scalastyle.Was this patch authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex