46
46
#include "optimizer/tlist.h"
47
47
#include "parser/parse_clause.h"
48
48
#include "parser/parsetree.h"
49
+ #include "partitioning/partdesc.h"
49
50
#include "partitioning/partprune.h"
50
51
#include "utils/lsyscache.h"
51
52
#include "utils/uri.h"
@@ -102,6 +103,11 @@ typedef struct
102
103
bool result ;
103
104
} contain_motion_walk_context ;
104
105
106
+ typedef struct
107
+ {
108
+ bool computeOnSlice ; /* does root slice contain computation node (Sort, Join, Agg) */
109
+ } offload_entry_to_qe_plan_walk_context ;
110
+
105
111
static Plan * create_scan_plan (PlannerInfo * root , Path * best_path ,
106
112
int flags );
107
113
static List * build_path_tlist (PlannerInfo * root , Path * path );
@@ -9063,4 +9069,160 @@ push_locus_down_after_elide_motion(Plan* plan)
9063
9069
plan = plan -> lefttree ;
9064
9070
}
9065
9071
}
9066
- }
9072
+ }
9073
+
9074
+ /*
9075
+ * Restore Entry locus to SingleQE in the root slice.
9076
+ * This is simply a reverse of push_locus_down_after_elide_motion.
9077
+ * The difference is that it's NOT used when creating a plan but rather
9078
+ * after a plan gets created, it's used to modify the plan in offload_entry_to_qe.
9079
+ */
9080
+ static void
9081
+ replace_entry_locus_with_singleqe (Plan * plan )
9082
+ {
9083
+ while (plan && (plan -> locustype == CdbLocusType_Entry ))
9084
+ {
9085
+ plan -> locustype = CdbLocusType_SingleQE ;
9086
+ switch (nodeTag (plan ))
9087
+ {
9088
+ case T_Motion :
9089
+ return ;
9090
+ case T_Append :
9091
+ {
9092
+ List * subplans = NIL ;
9093
+ ListCell * cell ;
9094
+ subplans = ((Append * ) (plan ))-> appendplans ;
9095
+ foreach (cell , subplans )
9096
+ {
9097
+ replace_entry_locus_with_singleqe (lfirst (cell ));
9098
+ }
9099
+ break ;
9100
+ }
9101
+ case T_SubqueryScan :
9102
+ plan = ((SubqueryScan * )(plan ))-> subplan ;
9103
+ break ;
9104
+ case T_NestLoop :
9105
+ case T_MergeJoin :
9106
+ case T_HashJoin :
9107
+ replace_entry_locus_with_singleqe (plan -> righttree );
9108
+ /* FALLTHROUGH */
9109
+ default :
9110
+ plan = plan -> lefttree ;
9111
+ break ;
9112
+ }
9113
+ }
9114
+ }
9115
+
9116
+ /*
9117
+ * Check whether we can safely offload root slice on QD to a QE.
9118
+ */
9119
+ static bool
9120
+ safe_to_offload_entry_to_qe_rte_walker (List * rtes )
9121
+ {
9122
+ ListCell * lc ;
9123
+ foreach (lc , rtes )
9124
+ {
9125
+ RangeTblEntry * rte = lfirst_node (RangeTblEntry , lc );
9126
+ if (rte -> rtekind == RTE_RELATION )
9127
+ {
9128
+ // check if any partition of a partitioned table is a coordinator-only external/foreign table
9129
+ if (rte -> relkind == RELKIND_PARTITIONED_TABLE )
9130
+ {
9131
+ Relation rel ;
9132
+ PartitionDesc desc ;
9133
+
9134
+ rel = relation_open (rte -> relid , NoLock );
9135
+ desc = RelationGetPartitionDesc (rel , true);
9136
+ relation_close (rel , NoLock );
9137
+ for (int i = 0 ; i < desc -> nparts ; i ++ )
9138
+ {
9139
+ if (GpPolicyIsEntry (GpPolicyFetch (desc -> oids [i ])))
9140
+ return false;
9141
+ }
9142
+ return true;
9143
+ }
9144
+ else
9145
+ return !GpPolicyIsEntry (GpPolicyFetch (rte -> relid ));
9146
+ }
9147
+ else if (rte -> rtekind == RTE_SUBQUERY )
9148
+ {
9149
+ if (!safe_to_offload_entry_to_qe_rte_walker (rte -> subquery -> rtable ))
9150
+ return false;
9151
+ }
9152
+ }
9153
+ return true;
9154
+ }
9155
+
9156
+ /*
9157
+ * Check if there are multiple Motion in which the root slice contains computation (Sort, Join or Aggregate).
9158
+ */
9159
+ static bool
9160
+ should_offload_entry_to_qe_plan_walker (Plan * plan , offload_entry_to_qe_plan_walk_context * ctx )
9161
+ {
9162
+ while (plan && plan -> locustype == CdbLocusType_Entry )
9163
+ {
9164
+ switch (nodeTag (plan ))
9165
+ {
9166
+ case T_Motion :
9167
+ return ctx -> computeOnSlice ;
9168
+ case T_SubqueryScan :
9169
+ plan = ((SubqueryScan * ) plan )-> subplan ;
9170
+ break ;
9171
+ /* join */
9172
+ case T_Join :
9173
+ case T_MergeJoin :
9174
+ case T_HashJoin :
9175
+ case T_NestLoop :
9176
+ ctx -> computeOnSlice = true;
9177
+ if (should_offload_entry_to_qe_plan_walker (plan -> righttree , ctx ))
9178
+ return true;
9179
+ plan = plan -> lefttree ;
9180
+ break ;
9181
+ /* sort */
9182
+ case T_Sort :
9183
+ /* aggregates*/
9184
+ case T_Agg :
9185
+ case T_WindowAgg :
9186
+ ctx -> computeOnSlice = true;
9187
+ /* FALLTHROUGH */
9188
+ default :
9189
+ plan = plan -> lefttree ;
9190
+ break ;
9191
+ }
9192
+ }
9193
+ return false;
9194
+ }
9195
+
9196
+ Plan *
9197
+ offload_entry_to_qe (PlannerInfo * root , Plan * plan , int sendslice_parallel )
9198
+ {
9199
+ offload_entry_to_qe_plan_walk_context plan_walk_ctx ;
9200
+ plan_walk_ctx .computeOnSlice = false;
9201
+
9202
+ if (root -> parse -> commandType == CMD_SELECT &&
9203
+ should_offload_entry_to_qe_plan_walker (plan , & plan_walk_ctx ) &&
9204
+ safe_to_offload_entry_to_qe_rte_walker (root -> parse -> rtable ) &&
9205
+ !contain_volatile_functions ((Node * ) root -> parse ))
9206
+ {
9207
+ CdbPathLocus entrylocus ;
9208
+ PlanSlice * sendSlice ;
9209
+ sendSlice = (PlanSlice * ) palloc0 (sizeof (PlanSlice ));
9210
+ sendSlice -> gangType = GANGTYPE_SINGLETON_READER ;
9211
+ sendSlice -> numsegments = 1 ;
9212
+ sendSlice -> sliceIndex = -1 ;
9213
+ sendSlice -> parallel_workers = sendslice_parallel ;
9214
+ sendSlice -> segindex = gp_session_id % getgpsegmentCount ();
9215
+
9216
+ replace_entry_locus_with_singleqe (plan );
9217
+
9218
+ plan = (Plan * ) make_union_motion (plan );
9219
+ ((Motion * ) plan )-> senderSliceInfo = sendSlice ;
9220
+
9221
+ plan -> locustype = CdbLocusType_Entry ;
9222
+ CdbPathLocus_MakeEntry (& entrylocus );
9223
+ if (plan -> flow )
9224
+ pfree (plan -> flow );
9225
+ plan -> flow = cdbpathtoplan_create_flow (root , entrylocus );
9226
+ }
9227
+ return plan ;
9228
+ }
0 commit comments