Skip to content

Commit 6f2b7da

Browse files
authored
[fix](cte) unnest subquery before RewriteCteChildren (apache#60717)
### What problem does this PR solve? APPLY_TO_JOIN should be applied on CTE producer before REWRITE_CTE_CHILDREN. Some rules depends on StatsDerive, and StatsDerive assumes that all slots used by a plan node is from its children's outputs. But LogicalApply breaks this assumption.
1 parent c137bd2 commit 6f2b7da

3 files changed

Lines changed: 209 additions & 125 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java

Lines changed: 134 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -421,67 +421,69 @@ public class Rewriter extends AbstractBatchJobExecutor {
421421
)
422422
);
423423

424+
private static final List<RewriteJob> NORMALIZE_PLAN_JOBS = jobs(
425+
topic("Plan Normalization",
426+
custom(RuleType.FOLD_CONSTANT_FOR_SQL_CACHE, FoldConstantForSqlCache::new),
427+
// move MergeProjects rule from analyze phase
428+
// because SubqueryToApply and BindSink rule may create extra project node
429+
// we need merge them at the beginning of rewrite phase to let later rules happy
430+
topDown(new MergeProjectable()),
431+
topDown(
432+
new EliminateOrderByConstant(),
433+
new EliminateSortUnderSubqueryOrView(),
434+
// MergeProjects depends on this rule
435+
new LogicalSubQueryAliasToLogicalProject(),
436+
// TODO: we should do expression normalization after plan normalization
437+
// because some rewritten depends on sub expression tree matching
438+
// such as group by key matching and replaced
439+
// but we need to do some normalization before subquery unnesting,
440+
// such as extract common expression.
441+
ExpressionNormalizationAndOptimization.FULL_RULE_INSTANCE,
442+
new AvgDistinctToSumDivCount(),
443+
new CountDistinctRewrite(),
444+
new ExtractFilterFromCrossJoin()
445+
),
446+
topDown(
447+
// ExtractSingleTableExpressionFromDisjunction conflict to InPredicateToEqualToRule
448+
// in the ExpressionNormalization, so must invoke in another job, otherwise dead loop.
449+
new ExtractSingleTableExpressionFromDisjunction()
450+
)
451+
),
452+
// subquery unnesting relay on ExpressionNormalization to extract common factor expression
453+
topic("Subquery unnesting",
454+
cascadesContext -> cascadesContext.rewritePlanContainsTypes(LogicalApply.class),
455+
// after doing NormalizeAggregate in analysis job
456+
// we need run the following 2 rules to make AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
457+
bottomUp(new PullUpProjectUnderApply()),
458+
topDown(
459+
new PushDownFilterThroughProject(),
460+
// the subquery may have where and having clause
461+
// so there may be two filters we need to merge them
462+
new MergeFilters()
463+
),
464+
// query rewrite support window, so add this rule here
465+
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION, AggScalarSubQueryToWindowFunction::new),
466+
bottomUp(
467+
new EliminateUselessPlanUnderApply(),
468+
// CorrelateApplyToUnCorrelateApply and ApplyToJoin
469+
// and SelectMaterializedIndexWithAggregate depends on this rule
470+
new MergeProjectable(),
471+
/*
472+
* Subquery unnesting.
473+
* 1. Adjust the plan in correlated logicalApply
474+
* so that there are no correlated columns in the subquery.
475+
* 2. Convert logicalApply to a logicalJoin.
476+
* TODO: group these rules to make sure the result plan is what we expected.
477+
*/
478+
new CorrelateApplyToUnCorrelateApply(),
479+
new ApplyToJoin()
480+
)
481+
)
482+
);
483+
424484
private static final List<RewriteJob> CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN = notTraverseChildrenOf(
425485
ImmutableSet.of(LogicalCTEAnchor.class),
426486
() -> jobs(
427-
topic("Plan Normalization",
428-
custom(RuleType.FOLD_CONSTANT_FOR_SQL_CACHE, FoldConstantForSqlCache::new),
429-
// move MergeProjects rule from analyze phase
430-
// because SubqueryToApply and BindSink rule may create extra project node
431-
// we need merge them at the beginning of rewrite phase to let later rules happy
432-
topDown(new MergeProjectable()),
433-
topDown(
434-
new EliminateOrderByConstant(),
435-
new EliminateSortUnderSubqueryOrView(),
436-
// MergeProjects depends on this rule
437-
new LogicalSubQueryAliasToLogicalProject(),
438-
// TODO: we should do expression normalization after plan normalization
439-
// because some rewritten depends on sub expression tree matching
440-
// such as group by key matching and replaced
441-
// but we need to do some normalization before subquery unnesting,
442-
// such as extract common expression.
443-
ExpressionNormalizationAndOptimization.FULL_RULE_INSTANCE,
444-
new AvgDistinctToSumDivCount(),
445-
new CountDistinctRewrite(),
446-
new ExtractFilterFromCrossJoin()
447-
),
448-
topDown(
449-
// ExtractSingleTableExpressionFromDisjunction conflict to InPredicateToEqualToRule
450-
// in the ExpressionNormalization, so must invoke in another job, otherwise dead loop.
451-
new ExtractSingleTableExpressionFromDisjunction()
452-
)
453-
),
454-
// subquery unnesting relay on ExpressionNormalization to extract common factor expression
455-
topic("Subquery unnesting",
456-
cascadesContext -> cascadesContext.rewritePlanContainsTypes(LogicalApply.class),
457-
// after doing NormalizeAggregate in analysis job
458-
// we need run the following 2 rules to make AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
459-
bottomUp(new PullUpProjectUnderApply()),
460-
topDown(
461-
new PushDownFilterThroughProject(),
462-
// the subquery may have where and having clause
463-
// so there may be two filters we need to merge them
464-
new MergeFilters()
465-
),
466-
// query rewrite support window, so add this rule here
467-
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION, AggScalarSubQueryToWindowFunction::new),
468-
bottomUp(
469-
new EliminateUselessPlanUnderApply(),
470-
// CorrelateApplyToUnCorrelateApply and ApplyToJoin
471-
// and SelectMaterializedIndexWithAggregate depends on this rule
472-
new MergeProjectable(),
473-
/*
474-
* Subquery unnesting.
475-
* 1. Adjust the plan in correlated logicalApply
476-
* so that there are no correlated columns in the subquery.
477-
* 2. Convert logicalApply to a logicalJoin.
478-
* TODO: group these rules to make sure the result plan is what we expected.
479-
*/
480-
new CorrelateApplyToUnCorrelateApply(),
481-
new ApplyToJoin()
482-
)
483-
),
484-
485487
// before `Subquery unnesting` topic, some correlate slots should have appeared at LogicalApply.left,
486488
// but it appeared at LogicalApply.right. After the `Subquery unnesting` topic, all slots is placed in a
487489
// normal position, then we can check column privileges by these steps
@@ -862,92 +864,99 @@ public static Rewriter getCteChildrenRewriter(
862864
public static Rewriter getWholeTreeRewriterWithCustomJobs(
863865
CascadesContext cascadesContext, List<RewriteJob> jobs) {
864866
List<RewriteJob> wholeTreeRewriteJobs = getWholeTreeRewriteJobs(
865-
false, false, jobs, ImmutableList.of(), true);
867+
false, false, jobs, ImmutableList.of(), true, false);
866868
return new Rewriter(cascadesContext, wholeTreeRewriteJobs, true);
867869
}
868870

869871
private static List<RewriteJob> getWholeTreeRewriteJobs(boolean runCboRules) {
870872
return getWholeTreeRewriteJobs(true, true,
871873
CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN,
872-
CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN, runCboRules);
874+
CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN, runCboRules, true);
873875
}
874876

875877
private static List<RewriteJob> getWholeTreeRewriteJobs(
876878
boolean needSubPathPushDown,
877879
boolean needOrExpansion,
878880
List<RewriteJob> beforePushDownJobs,
879881
List<RewriteJob> afterPushDownJobs,
880-
boolean runCboRules) {
882+
boolean runCboRules,
883+
boolean includeNormalizePlanJobs) {
884+
ImmutableList.Builder<RewriteJob> builder = ImmutableList.builder();
885+
if (includeNormalizePlanJobs) {
886+
builder.addAll(NORMALIZE_PLAN_JOBS);
887+
}
888+
builder.addAll(notTraverseChildrenOf(
889+
ImmutableSet.of(LogicalCTEAnchor.class),
890+
() -> {
891+
List<RewriteJob> rewriteJobs = Lists.newArrayListWithExpectedSize(300);
881892

882-
return notTraverseChildrenOf(
883-
ImmutableSet.of(LogicalCTEAnchor.class),
884-
() -> {
885-
List<RewriteJob> rewriteJobs = Lists.newArrayListWithExpectedSize(300);
893+
rewriteJobs.addAll(jobs(
894+
topic("cte inline and pull up all cte anchor",
895+
custom(RuleType.PULL_UP_CTE_ANCHOR, PullUpCteAnchor::new),
896+
custom(RuleType.CTE_INLINE, CTEInline::new)
897+
),
898+
topic("process limit session variables",
899+
custom(RuleType.ADD_DEFAULT_LIMIT, AddDefaultLimit::new)
900+
),
901+
topic("record query tmp plan for mv pre rewrite",
902+
custom(RuleType.RECORD_PLAN_FOR_MV_PRE_REWRITE, RecordPlanForMvPreRewrite::new)
903+
),
904+
topic("rewrite cte sub-tree before sub path push down",
905+
custom(RuleType.REWRITE_CTE_CHILDREN,
906+
() -> new RewriteCteChildren(beforePushDownJobs, runCboRules)
907+
)
908+
)));
909+
rewriteJobs.addAll(jobs(topic("convert outer join to anti",
910+
custom(RuleType.CONVERT_OUTER_JOIN_TO_ANTI, ConvertOuterJoinToAntiJoin::new))));
911+
rewriteJobs.addAll(jobs(topic("eliminate group by key by uniform",
912+
custom(RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, EliminateGroupByKeyByUniform::new))));
913+
if (needOrExpansion) {
914+
rewriteJobs.addAll(jobs(topic("or expansion",
915+
custom(RuleType.OR_EXPANSION, () -> OrExpansion.INSTANCE))));
916+
}
917+
rewriteJobs.add(topic("repeat rewrite",
918+
custom(RuleType.DECOMPOSE_REPEAT, () -> DecomposeRepeatWithPreAggregation.INSTANCE)));
919+
920+
rewriteJobs.addAll(jobs(topic("split multi distinct",
921+
custom(RuleType.DISTINCT_AGG_STRATEGY_SELECTOR,
922+
() -> DistinctAggStrategySelector.INSTANCE))));
923+
924+
// Rewrite search function before VariantSubPathPruning
925+
// so that ElementAt expressions from search can be processed
926+
rewriteJobs.addAll(jobs(
927+
bottomUp(new RewriteSearchToSlots())
928+
));
886929

887-
rewriteJobs.addAll(jobs(
888-
topic("cte inline and pull up all cte anchor",
889-
custom(RuleType.PULL_UP_CTE_ANCHOR, PullUpCteAnchor::new),
890-
custom(RuleType.CTE_INLINE, CTEInline::new)
891-
),
892-
topic("process limit session variables",
893-
custom(RuleType.ADD_DEFAULT_LIMIT, AddDefaultLimit::new)
894-
),
895-
topic("record query tmp plan for mv pre rewrite",
896-
custom(RuleType.RECORD_PLAN_FOR_MV_PRE_REWRITE, RecordPlanForMvPreRewrite::new)
897-
),
898-
topic("rewrite cte sub-tree before sub path push down",
899-
custom(RuleType.REWRITE_CTE_CHILDREN,
900-
() -> new RewriteCteChildren(beforePushDownJobs, runCboRules)
930+
if (needSubPathPushDown) {
931+
rewriteJobs.addAll(jobs(
932+
topic("variant element_at push down",
933+
custom(RuleType.VARIANT_SUB_PATH_PRUNING, VariantSubPathPruning::new)
901934
)
902-
)));
903-
rewriteJobs.addAll(jobs(topic("convert outer join to anti",
904-
custom(RuleType.CONVERT_OUTER_JOIN_TO_ANTI, ConvertOuterJoinToAntiJoin::new))));
905-
rewriteJobs.addAll(jobs(topic("eliminate group by key by uniform",
906-
custom(RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, EliminateGroupByKeyByUniform::new))));
907-
if (needOrExpansion) {
908-
rewriteJobs.addAll(jobs(topic("or expansion",
909-
custom(RuleType.OR_EXPANSION, () -> OrExpansion.INSTANCE))));
910-
}
911-
rewriteJobs.add(topic("repeat rewrite",
912-
custom(RuleType.DECOMPOSE_REPEAT, () -> DecomposeRepeatWithPreAggregation.INSTANCE)));
913-
rewriteJobs.addAll(jobs(topic("split multi distinct",
914-
custom(RuleType.DISTINCT_AGG_STRATEGY_SELECTOR, () -> DistinctAggStrategySelector.INSTANCE))));
915-
916-
// Rewrite search function before VariantSubPathPruning
917-
// so that ElementAt expressions from search can be processed
918-
rewriteJobs.addAll(jobs(
919-
bottomUp(new RewriteSearchToSlots())
920-
));
921-
922-
if (needSubPathPushDown) {
923-
rewriteJobs.addAll(jobs(
924-
topic("variant element_at push down",
925-
custom(RuleType.VARIANT_SUB_PATH_PRUNING, VariantSubPathPruning::new)
935+
));
936+
}
937+
rewriteJobs.add(
938+
topic("nested column prune",
939+
custom(RuleType.NESTED_COLUMN_PRUNING, NestedColumnPruning::new)
926940
)
941+
);
942+
rewriteJobs.addAll(jobs(
943+
topic("rewrite cte sub-tree after sub path push down",
944+
custom(RuleType.CLEAR_CONTEXT_STATUS, ClearContextStatus::new),
945+
custom(RuleType.REWRITE_CTE_CHILDREN,
946+
() -> new RewriteCteChildren(afterPushDownJobs, runCboRules)
947+
)
948+
),
949+
topic("whole plan check",
950+
custom(RuleType.ADJUST_NULLABLE, () -> new AdjustNullable(false))
951+
),
952+
// NullableDependentExpressionRewrite need to be done after nullable fixed
953+
topic("condition function", bottomUp(ImmutableList.of(
954+
new NullableDependentExpressionRewrite())))
927955
));
956+
return rewriteJobs;
928957
}
929-
rewriteJobs.add(
930-
topic("nested column prune",
931-
custom(RuleType.NESTED_COLUMN_PRUNING, NestedColumnPruning::new)
932-
)
933-
);
934-
rewriteJobs.addAll(jobs(
935-
topic("rewrite cte sub-tree after sub path push down",
936-
custom(RuleType.CLEAR_CONTEXT_STATUS, ClearContextStatus::new),
937-
custom(RuleType.REWRITE_CTE_CHILDREN,
938-
() -> new RewriteCteChildren(afterPushDownJobs, runCboRules)
939-
)
940-
),
941-
topic("whole plan check",
942-
custom(RuleType.ADJUST_NULLABLE, () -> new AdjustNullable(false))
943-
),
944-
// NullableDependentExpressionRewrite need to be done after nullable fixed
945-
topic("condition function", bottomUp(ImmutableList.of(
946-
new NullableDependentExpressionRewrite())))
947-
));
948-
return rewriteJobs;
949-
}
950-
);
958+
));
959+
return builder.build();
951960
}
952961

953962
@Override

fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ public PlanChecker applyTopDown(RuleFactory ruleFactory) {
226226
return planChecker;
227227
}
228228

229+
public PlanChecker applyTopDown(RuleFactory... ruleFactories) {
230+
List<Rule> allRules = new ArrayList<>();
231+
for (RuleFactory factory : ruleFactories) {
232+
allRules.addAll(factory.buildRules());
233+
}
234+
return applyTopDown(allRules);
235+
}
236+
229237
public PlanChecker applyTopDown(List<Rule> rule) {
230238
Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext,
231239
ImmutableList.of(new RootPlanTreeRewriteJob(new FilteredRules(rule), PlanTreeRewriteTopDownJob::new, true)))

0 commit comments

Comments
 (0)