Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dbcon/execplan/calpontselectexecutionplan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
b << timeZone;
b << fPron;
b << (uint8_t)fWithRollup;
b << (uint8_t)fIsRecursiveWithTable;
b << (uint8_t)fIsRecursiveQuery;
b << (uint8_t)fContainsRecursiveQuery;

b << fMaxRecursiveDepth;
}

void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
Expand Down Expand Up @@ -832,6 +837,13 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
utils::Pron::instance().pron(fPron);
b >> tmp8;
fWithRollup = tmp8;
b >> tmp8;
fIsRecursiveWithTable = tmp8;
b >> tmp8;
fIsRecursiveQuery = tmp8;
b >> tmp8;
fContainsRecursiveQuery = tmp8;
b >> fMaxRecursiveDepth;
}

bool CalpontSelectExecutionPlan::operator==(const CalpontSelectExecutionPlan& t) const
Expand Down
55 changes: 54 additions & 1 deletion dbcon/execplan/calpontselectexecutionplan.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
/** @file */

#pragma once
#include <cstdint>
#include <vector>
#include <map>
#include <iosfwd>
Expand Down Expand Up @@ -496,7 +497,12 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
{
return fDerivedTableList;
}
void derivedTableList(const SelectList& derivedTableList)

SelectList& derivedTableList()
{
return fDerivedTableList;
}
void derivedTableList(SelectList& derivedTableList)
{
fDerivedTableList = derivedTableList;
}
Expand All @@ -523,10 +529,12 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
{
fUnionVec = unionVec;
}

const SelectList& unionVec() const
{
return fUnionVec;
}

SelectList& unionVec()
{
return fUnionVec;
Expand Down Expand Up @@ -765,6 +773,46 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
return fTimeZone;
}

void isRecursiveWithTable(bool b)
{
fIsRecursiveWithTable = b;
}

bool isRecursiveWithTable()
{
return fIsRecursiveWithTable;
}

void isRecursiveQuery(bool b)
{
fIsRecursiveQuery = b;
}

bool isRecursiveQuery()
{
return fIsRecursiveQuery;
}

void containsRecursiveQuery(bool b)
{
fContainsRecursiveQuery = b;
}

bool containsRecursiveQuery()
{
return fContainsRecursiveQuery;
}

void maxRecursiveDepth(uint32_t i)
{
fMaxRecursiveDepth = i;
}

int maxRecursiveDepth()
{
return fMaxRecursiveDepth;
}

/**
* The serialization interface
*/
Expand Down Expand Up @@ -985,6 +1033,11 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
* A flag to compute subtotals, related to GROUP BY operation.
*/
bool fWithRollup;
bool fIsRecursiveWithTable = false;
bool fIsRecursiveQuery = false;
bool fContainsRecursiveQuery = false;

uint32_t fMaxRecursiveDepth;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion dbcon/joblist/fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class FIFO : public DataListImpl<std::vector<element_t>, element_t>
}

inline void dropToken() {};
inline void dropToken(uint32_t){};
inline void dropToken(uint32_t) {};

// Counters that reflect how many many times this FIFO blocked on reads/writes
uint64_t blockedWriteCount() const;
Expand Down
10 changes: 9 additions & 1 deletion dbcon/joblist/jlf_subquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <iostream>
#include <stack>
#include <iterator>
//#define NDEBUG
// #define NDEBUG
#include <cassert>
#include <vector>
using namespace std;
Expand Down Expand Up @@ -748,6 +748,10 @@ int doFromSubquery(CalpontExecutionPlan* ep, const string& alias, const string&
SJSTEP subQueryStep = transformer.makeSubQueryStep(csep, true);
subQueryStep->view(view);
SJSTEP subAd(new SubAdapterStep(subQueryStep, jobInfo));
if (csep->isRecursiveQuery())
{
dynamic_cast<SubAdapterStep*>(subAd.get())->isRecursiveStep(true);
}
jobInfo.selectAndFromSubs.push_back(subAd);

return CNX_VTABLE_ID;
Expand Down Expand Up @@ -870,6 +874,10 @@ SJSTEP doUnionSub(CalpontExecutionPlan* ep, JobInfo& jobInfo)
transformer.setVarbinaryOK();
SJSTEP subQueryStep = transformer.makeSubQueryStep(csep, false);
SJSTEP subAd(new SubAdapterStep(subQueryStep, jobInfo));
if (csep->isRecursiveQuery())
{
dynamic_cast<SubAdapterStep*>(subAd.get())->isRecursiveStep(true);
}
return subAd;
}

Expand Down
196 changes: 196 additions & 0 deletions dbcon/joblist/jlf_tuplejoblist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5262,7 +5262,203 @@ SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo&

return SJSTEP(unionStep);
}
SJSTEP recursiveUnionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo,
JobStepVector& recurQueries, uint32_t keyCount)
{
vector<RowGroup> inputRGs;
vector<bool> distinct;
uint64_t colCount = jobInfo.deliveredCols.size();

vector<uint32_t> oids;
vector<uint32_t> keys;
vector<uint32_t> scale;
vector<uint32_t> precision;
vector<uint32_t> width;
vector<CalpontSystemCatalog::ColDataType> types;
vector<uint32_t> csNums;
JobStepAssociation jsaToUnion;

// bug4388, share code with connector for column type coversion
vector<vector<CalpontSystemCatalog::ColType>> queryColTypes;

for (uint64_t j = 0; j < colCount; ++j)
queryColTypes.push_back(vector<CalpontSystemCatalog::ColType>(queries.size() + recurQueries.size()));

for (uint64_t i = 0; i < queries.size(); i++)
{
SJSTEP& spjs = queries[i];
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());

if (tds == NULL)
{
throw runtime_error("Not a deliverable step.");
}

const RowGroup& rg = tds->getDeliveredRowGroup();
inputRGs.push_back(rg);

const vector<uint32_t>& scaleIn = rg.getScale();
const vector<uint32_t>& precisionIn = rg.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typesIn = rg.getColTypes();
const vector<uint32_t>& csNumsIn = rg.getCharsetNumbers();

for (uint64_t j = 0; j < colCount; ++j)
{
queryColTypes[j][i].colDataType = typesIn[j];
queryColTypes[j][i].charsetNumber = csNumsIn[j];
queryColTypes[j][i].scale = scaleIn[j];
queryColTypes[j][i].precision = precisionIn[j];
queryColTypes[j][i].colWidth = rg.getColumnWidth(j);
}

if (i == 0)
{
const vector<uint32_t>& oidsIn = rg.getOIDs();
const vector<uint32_t>& keysIn = rg.getKeys();
oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + colCount);
keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + colCount);
}

// if all union types are UNION_ALL, distinctUnionNum is 0.
distinct.push_back(distinctUnionNum > i);

AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
spdl->rowGroupDL(dl);
dl->OID(CNX_VTABLE_ID);
JobStepAssociation jsa;
jsa.outAdd(spdl);
spjs->outputAssociation(jsa);
jsaToUnion.outAdd(spdl);
}

for (uint64_t i = 0; i < recurQueries.size(); i++)
{
SJSTEP spjs = recurQueries[i];
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());

if (tds == NULL)
{
throw runtime_error("Not a deliverable step.");
}

const RowGroup& rg = tds->getDeliveredRowGroup();
inputRGs.push_back(rg);

const vector<uint32_t>& scaleIn = rg.getScale();
const vector<uint32_t>& precisionIn = rg.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typesIn = rg.getColTypes();
const vector<uint32_t>& csNumsIn = rg.getCharsetNumbers();

for (uint64_t j = 0; j < colCount; ++j)
{
queryColTypes[j][i + queries.size()].colDataType = typesIn[j];
queryColTypes[j][i + queries.size()].charsetNumber = csNumsIn[j];
queryColTypes[j][i + queries.size()].scale = scaleIn[j];
queryColTypes[j][i + queries.size()].precision = precisionIn[j];
queryColTypes[j][i + queries.size()].colWidth = rg.getColumnWidth(j);
}

// if all union types are UNION_ALL, distinctUnionNum is 0.
distinct.push_back(distinctUnionNum > i);

// mostly should have initialised DLs hence the change
if (i < recurQueries.size() - 1)
{
AnyDataListSPtr spdl = spjs->outputAssociation().outAt(0);
spdl->rowGroupDL()->setNumConsumers(2);
jsaToUnion.outAdd(spdl);
}
else
{
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
spdl->rowGroupDL(dl);
dl->OID(CNX_VTABLE_ID);
JobStepAssociation jsa;
jsa.outAdd(spdl);
spjs->outputAssociation(jsa);
jsaToUnion.outAdd(spdl);
}
}

AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
spdl->rowGroupDL(dl);
dl->OID(CNX_VTABLE_ID);
JobStepAssociation jsa;
jsa.outAdd(spdl);
TupleRecursiveUnion* unionStep = new TupleRecursiveUnion(CNX_VTABLE_ID, jobInfo, keyCount);
unionStep->inputAssociation(jsaToUnion);
unionStep->outputAssociation(jsa);

// This return code in the call to convertUnionColType() below would
// always be 0. This is because convertUnionColType() is also called
// in the connector code in getSelectPlan() which handle
// the non-zero return code scenarios from this function call and error
// out, in which case, the execution does not even get to ExeMgr.
unsigned int dummyUnionedTypeRc = 0;

// get unioned column types
for (uint64_t j = 0; j < colCount; ++j)
{
CalpontSystemCatalog::ColType colType =
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j], dummyUnionedTypeRc);
types.push_back(colType.colDataType);
csNums.push_back(colType.charsetNumber);
scale.push_back(colType.scale);
precision.push_back(colType.precision);
width.push_back(colType.colWidth);
}

vector<uint32_t> pos;
pos.push_back(2);

for (uint64_t i = 0; i < oids.size(); ++i)
pos.push_back(pos[i] + width[i]);

unionStep->setInputRowGroups(inputRGs);
unionStep->setDistinctFlags(distinct);
unionStep->setOutputRowGroup(
RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold));

unionStep->recursiveSteps(recurQueries);
// Fix for bug 4388 adjusts the result type at connector side, this workaround is obsolete.
// bug 3067, update the returned column types.
// This is a workaround as the connector always uses the first query' returned columns.
// ct.colDataType = types[i];
// ct.scale = scale[i];
// ct.colWidth = width[i];

for (size_t i = 0; i < jobInfo.deliveredCols.size(); i++)
{
CalpontSystemCatalog::ColType ct = jobInfo.deliveredCols[i]->resultType();
// XXX remove after connector change
ct.colDataType = types[i];
ct.scale = scale[i];
ct.colWidth = width[i];

// varchar/varbinary column width has been fudged, see fudgeWidth in jlf_common.cpp.
if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
ct.colWidth--;
else if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
ct.colWidth -= 2;

jobInfo.deliveredCols[i]->resultType(ct);
}

if (jobInfo.trace)
{
cout << boldStart << "\ninput RGs: (distinct=" << distinctUnionNum << ")\n" << boldStop;

for (vector<RowGroup>::iterator i = inputRGs.begin(); i != inputRGs.end(); i++)
cout << i->toString() << endl << endl;

cout << boldStart << "output RG:\n" << boldStop << unionStep->getDeliveredRowGroup().toString() << endl;
}

return SJSTEP(unionStep);
}
} // namespace joblist

#ifdef __clang__
Expand Down
3 changes: 2 additions & 1 deletion dbcon/joblist/jlf_tuplejoblist.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ void orExpresssion(const execplan::Operator* op, JobInfo& jobInfo);

// union the queries and return the tuple union step
SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo, uint32_t keyCount);

SJSTEP recursiveUnionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo,
JobStepVector& recurQueries, uint32_t keyCount);
void addAnnexStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, JobInfo& jobInfo,
IDBQueryType queryType = execplan::IDBQueryType::SELECT);

Expand Down
Loading