diff --git a/oap-ape/ape-java/ape-spark/src/resources/linux/64/lib/libparquet_jni.so b/oap-ape/ape-java/ape-spark/src/resources/linux/64/lib/libparquet_jni.so new file mode 100755 index 000000000..3e87e76f3 Binary files /dev/null and b/oap-ape/ape-java/ape-spark/src/resources/linux/64/lib/libparquet_jni.so differ diff --git a/oap-ape/ape-native/build1202/lib/libparquet_jni.so b/oap-ape/ape-native/build1202/lib/libparquet_jni.so new file mode 100755 index 000000000..2ea8e1409 Binary files /dev/null and b/oap-ape/ape-native/build1202/lib/libparquet_jni.so differ diff --git a/oap-ape/ape-native/build1202/lib/libparse.so b/oap-ape/ape-native/build1202/lib/libparse.so new file mode 100755 index 000000000..c24228cf6 Binary files /dev/null and b/oap-ape/ape-native/build1202/lib/libparse.so differ diff --git a/oap-ape/ape-native/buildwithqpl/lib/libparquet_jni.so b/oap-ape/ape-native/buildwithqpl/lib/libparquet_jni.so new file mode 100755 index 000000000..dea36baff Binary files /dev/null and b/oap-ape/ape-native/buildwithqpl/lib/libparquet_jni.so differ diff --git a/oap-ape/ape-native/buildwithqpl/lib/libparse.so b/oap-ape/ape-native/buildwithqpl/lib/libparse.so new file mode 100755 index 000000000..5812f5a84 Binary files /dev/null and b/oap-ape/ape-native/buildwithqpl/lib/libparse.so differ diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index a1073e2f8..3ef4b7ef9 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -17,7 +17,7 @@ #include #include - +#include #include "arrow/util/cpu_info.h" #undef NDEBUG @@ -31,6 +31,23 @@ Reader::Reader() {} void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, std::string requiredSchema, int firstRowGroup, int rowGroupToRead) { + readPart = new struct readReady; + readPart->totalRowGroups = 0; + readPart->totalRowGroupsRead = 0; + readPart->totalColumns = 0; + readPart->totalRows = 0; + readPart->firstRowGroupIndex = 0; + readPart->currentRowGroup = 0; + readPart->totalRowsRead = 0; + readPart->totalRowsLoadedSoFar = 0; + readPart->currentBufferedRowGroup = -1; + readPart->currentBatchSize = 0; + readPart->initRequiredColumnCount = 0; + readPart->initPlusFilterRequiredColumnCount = 0; + readPart->dumpAggCursor = 0; + std::cout << "Reader::init new 0106" + << "\n"; + options = new arrow::fs::HdfsOptions(); ARROW_LOG(DEBUG) << "hdfsHost " << hdfsHost << " port " << hdfsPort; @@ -68,23 +85,26 @@ void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, fileMetaData = parquetReader->metadata(); - this->firstRowGroupIndex = firstRowGroup; - this->totalRowGroups = rowGroupToRead; + this->readPart->firstRowGroupIndex = firstRowGroup; + this->readPart->totalRowGroups = rowGroupToRead; - totalColumns = fileMetaData->num_columns(); + readPart->totalColumns = fileMetaData->num_columns(); ARROW_LOG(DEBUG) << "schema is " << fileMetaData->schema()->ToString(); ARROW_LOG(DEBUG) << "required schema is " << requiredSchema; convertSchema(requiredSchema); - currentRowGroup = firstRowGroupIndex; + readPart->currentRowGroup = readPart->firstRowGroupIndex; columnReaders.resize(requiredColumnIndex.size()); - initRequiredColumnCount = requiredColumnIndex.size(); - initPlusFilterRequiredColumnCount = initRequiredColumnCount; - ARROW_LOG(DEBUG) << "initRequiredColumnCount is " << initRequiredColumnCount; - - ARROW_LOG(INFO) << "init done, totalRowGroups " << totalRowGroups; + readPart->initRequiredColumnCount = requiredColumnIndex.size(); + readPart->initPlusFilterRequiredColumnCount = readPart->initRequiredColumnCount; + std::cout << "Reader::init 97 initRequiredColumnCount" + << readPart->initRequiredColumnCount << " initPlusFilterRequiredColumnCount" + << readPart->initPlusFilterRequiredColumnCount << "\n"; + ARROW_LOG(DEBUG) << "initRequiredColumnCount is " << readPart->initRequiredColumnCount; + + ARROW_LOG(INFO) << "init done, totalRowGroups " << readPart->totalRowGroups; } void Reader::initCacheManager(std::string fileName, std::string hdfsHost, int hdfsPort) { @@ -147,75 +167,345 @@ void convertBitMap(uint8_t* srcBitMap, uint8_t* dstByteMap, int len) { } } +// void copyStruct(struct readReady* readPart, struct readReady* filterPart) { +// (filterPart->totalRowGroups) = (readPart->totalRowGroups); +// (filterPart->totalRowGroupsRead) = (readPart->totalRowGroupsRead); +// (filterPart->totalColumns) = (readPart->totalColumns); +// (filterPart->totalRows) = (readPart->totalRows); +// (filterPart->firstRowGroupIndex) = (readPart->firstRowGroupIndex); +// (filterPart->currentRowGroup) = (readPart->currentRowGroup); +// (filterPart->totalRowsRead) = (readPart->totalRowsRead); +// (filterPart->totalRowsLoadedSoFar) = (readPart->totalRowsLoadedSoFar); +// (filterPart->rowsToRead) = (readPart->rowsToRead); +// (filterPart->currentBufferedRowGroup) = readPart->currentBufferedRowGroup; + +// filterPart->currentBatchSize = readPart->currentBatchSize; +// filterPart->initRequiredColumnCount = readPart->initRequiredColumnCount; +// filterPart->initPlusFilterRequiredColumnCount = +// readPart->initPlusFilterRequiredColumnCount; +// filterPart->dumpAggCursor = readPart->dumpAggCursor; + +// // allocateExtraBuffers(batchSize, *(filterPart->buffersPtr), *(filterPart->nullsPtr)); +// std::cout << "Reader::readBatch 219" +// << "\n"; +// // std::cout<<(*(readPart->buffersPtr)).size()<<" +// // "<<(*(filterPart->buffersPtr)).size()<<"\n"; +// // (*(filterPart->buffersPtr)).resize((*(readPart->buffersPtr)).size()); +// // std::cout<<(*(readPart->buffersPtr)).size()<<" +// // "<<(*(filterPart->buffersPtr)).size()<<"\n"; + +// // Not all input buffers can be used for column data loading. +// // espeically when agg pushing down is enabled. +// // E.g. input buffers could be in types of "tbl_col_a, sum(tbl_col_b)", +// // in which only the first buffer can be used for column data loading. + +// (filterPart->buffersPtr) = (readPart->buffersPtr); +// (filterPart->nullsPtr) = (readPart->nullsPtr); +// } + int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr_) { + std::cout << "Reader::readBatch start 162" + << "\n"; // Pre buffer row groups. // This is not called in `init` because `requiredColumnIndex` // may be changed by `setFilter` after `init`. + + if (readPart->totalRowsRead == 0) { + preBufferRowGroups(); + // Init grow group readers. + // This should be called after preBufferRowGroups + initRowGroupReaders(); + + // this reader have read all rows + if (readPart->totalRowsRead >= readPart->totalRows && readPart->dumpAggCursor == 0) { + return -1; + } + checkEndOfRowGroup(); + std::cout << "Reader::readBatch 177" + << "\n"; + readPart->buffersPtr = new std::vector(readPart->initRequiredColumnCount); + readPart->nullsPtr = new std::vector(readPart->initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + (*(readPart->buffersPtr))[i] = buffersPtr_[usedInitBufferIndex[i]]; + (*(readPart->nullsPtr))[i] = nullsPtr_[usedInitBufferIndex[i]]; + } + std::cout << "read buffer 232 = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + + allocateExtraBuffers(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + + // to + // do_________________________________________________________________________________________________ + if (aggExprs.size() == 0) { // will not do agg + std::cout << "Reader::readBatch 180" + << "\n"; + std::cout << "doReadBatch buffer 181 = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + + int rowsToRead = + doReadBatch(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + readPart->totalRowsRead += rowsToRead; + readPart->rowsToRead = rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << readPart->totalRowsRead; + } else { + std::cout << "Reader::readBatch 186" + << "\n"; + if (readPart->dumpAggCursor == 0) { + results.resize(aggExprs.size()); + for (int i = 0; i < aggExprs.size(); i++) { + std::vector nullVector(1); + results[i].nullVector = std::make_shared>(nullVector); + } + while (readPart->totalRowsRead < readPart->totalRows && !checkEndOfRowGroup()) { + std::cout << "Reader::readBatch 194" + << "\n"; + int rowsToRead = + doReadBatch(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + readPart->totalRowsRead += rowsToRead; + readPart->rowsToRead = rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << readPart->totalRowsRead; + } + } + } + std::cout << "Reader::readBatch 202" + << "\n"; + std::cout << "after read 228 = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + } + preBufferRowGroups(); + std::cout << "after read preBufferRowGroups = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; // Init grow group readers. // This should be called after preBufferRowGroups initRowGroupReaders(); + std::cout << "after read initRowGroupReaders = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; // this reader have read all rows - if (totalRowsRead >= totalRows && dumpAggCursor == 0) { + if (readPart->totalRowsRead >= readPart->totalRows && readPart->dumpAggCursor == 0) { + std::cout << "302 if in\n "; return -1; } checkEndOfRowGroup(); - - std::vector buffersPtr(initRequiredColumnCount); - std::vector nullsPtr(initRequiredColumnCount); - - // Not all input buffers can be used for column data loading. - // espeically when agg pushing down is enabled. - // E.g. input buffers could be in types of "tbl_col_a, sum(tbl_col_b)", - // in which only the first buffer can be used for column data loading. + std::cout << "after read checkEndOfRowGroup = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + struct readReady* filterPart = new struct readReady; + std::cout << "before copyStruct 311 "; + + //copyStruct(readPart, filterPart); + (filterPart->totalRowGroups) = (readPart->totalRowGroups); + (filterPart->totalRowGroupsRead) = (readPart->totalRowGroupsRead); + (filterPart->totalColumns) = (readPart->totalColumns); + (filterPart->totalRows) = (readPart->totalRows); + (filterPart->firstRowGroupIndex) = (readPart->firstRowGroupIndex); + (filterPart->currentRowGroup) = (readPart->currentRowGroup); + (filterPart->totalRowsRead) = (readPart->totalRowsRead); + (filterPart->totalRowsLoadedSoFar) = (readPart->totalRowsLoadedSoFar); + (filterPart->rowsToRead) = (readPart->rowsToRead); + (filterPart->currentBufferedRowGroup) = readPart->currentBufferedRowGroup; + + filterPart->currentBatchSize = readPart->currentBatchSize; + filterPart->initRequiredColumnCount = readPart->initRequiredColumnCount; + filterPart->initPlusFilterRequiredColumnCount = + readPart->initPlusFilterRequiredColumnCount; + filterPart->dumpAggCursor = readPart->dumpAggCursor; + + + std::cout << "Reader::readBatch 219" + << "\n"; + std::cout << "filterPart->buffersPtr before new pointed to "<<(int64_t)(filterPart->buffersPtr) + << "\n"; + (filterPart->buffersPtr) = (readPart->buffersPtr); + (filterPart->nullsPtr) = (readPart->nullsPtr); + std::cout << "filterPart->buffersPtr after new pointed to "<<(int64_t)(filterPart->buffersPtr) + << "\n"; + // for debug________________________________________________________ + (filterPart->buffersPtr_) = (readPart->buffersPtr_); + (filterPart->nullsPtr_) = (readPart->nullsPtr_); + std::cout << "last buffer 271 = { "; + for (int n : *(filterPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + std::cout << "last buffer should be 271 = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + // for debug________________________________________________________ + + readPart->buffersPtr_ = new int64_t[readPart->initRequiredColumnCount]; + readPart->nullsPtr_ = new int64_t[readPart->initRequiredColumnCount]; + + for (int i = 0; i < readPart->initRequiredColumnCount; i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: + (((readPart->buffersPtr_))[i]) = (int64_t) new bool[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::INT32: + (((readPart->buffersPtr_))[i]) = (int64_t) new int32_t[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::INT64: + (((readPart->buffersPtr_))[i]) = (int64_t) new int64_t[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::INT96: + (((readPart->buffersPtr_))[i]) = (int64_t) new parquet::Int96[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::FLOAT: + (((readPart->buffersPtr_))[i]) = (int64_t) new float[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::DOUBLE: + (((readPart->buffersPtr_))[i]) = (int64_t) new double[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::BYTE_ARRAY: + (((readPart->buffersPtr_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + (((readPart->buffersPtr_))[i]) = + (int64_t) new parquet::FixedLenByteArray[batchSize]; + (((readPart->nullsPtr_))[i]) = (int64_t) new int64_t[batchSize]; + break; + default: + ARROW_LOG(WARNING) << "Unsupported Type!"; + continue; + } + } + std::cout << "Reader::readBatch 344" + << "\n"; + std::cout << "readPart->buffersPtr before new pointed to "<<(int64_t)(readPart->buffersPtr) + << "\n"; + readPart->buffersPtr = new std::vector(readPart->initRequiredColumnCount); + readPart->nullsPtr = new std::vector(readPart->initRequiredColumnCount); + std::cout << "readPart->buffersPtr after new pointed to "<<(int64_t)(readPart->buffersPtr) + << "\n"; + std::cout << "Reader::readBatch 347" + << "\n"; + std::cout << (*(readPart->buffersPtr)).size()<<" "<<(*(readPart->nullsPtr)).size()<<" "<buffersPtr_[usedInitBufferIndex[i]] "<<(int64_t)(((readPart->buffersPtr_))[usedInitBufferIndex[i]])<<"\n"; + (*(readPart->buffersPtr))[i] = ((readPart->buffersPtr_))[usedInitBufferIndex[i]]; + (*(readPart->nullsPtr))[i] = ((readPart->nullsPtr_))[usedInitBufferIndex[i]]; } - - allocateExtraBuffers(batchSize, buffersPtr, nullsPtr); - - currentBatchSize = batchSize; + std::cout << "Reader::readBatch 345" + << "\n"; + allocateExtraBuffers(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + + // for (int i = 0; i < usedInitBufferIndex.size(); i++) { + // ((*(readPart->buffersPtr))[i]) = + // ((*(readPart->buffersPtr))[usedInitBufferIndex[i]]); + // (*(readPart->nullsPtr))[i] = (*(readPart->nullsPtr))[usedInitBufferIndex[i]]; + // } + std::cout << "Reader::readBatch 231" + << "\n"; + + readPart->currentBatchSize = batchSize; int rowsRet = 0; if (aggExprs.size() == 0) { // will not do agg - int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - totalRowsRead += rowsToRead; - ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); + std::cout << "Reader::readBatch 239" + << "\n"; + if (readPart->totalRowsLoadedSoFar - readPart->totalRowsRead > 0) { + std::cout << "doReadBatch buffer 283 = { "; + for (int n : *(readPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + int rowsToRead = + doReadBatch(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + readPart->totalRowsRead += rowsToRead; + readPart->rowsToRead = rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << readPart->totalRowsRead; + } + std::cout << "doFilter buffer 292 = { "; + for (int n : *(filterPart->buffersPtr)) { + std::cout << n << ", "; + } + std::cout << "}; \n"; + std::cout << "readPart->rowstoread :" << readPart->rowsToRead + << " readready->rowstoread :" << filterPart->rowsToRead << "\n"; + rowsRet = doFilter(filterPart->rowsToRead, *(filterPart->buffersPtr), + *(filterPart->nullsPtr)); + std::cout << "Reader::readBatch 282" + << "\n"; } else { - if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg + std::cout << "Reader::readBatch 246" + << "\n"; + if (readPart->dumpAggCursor == 0) { // will read a whole RowGroup and do agg results.resize(aggExprs.size()); for (int i = 0; i < aggExprs.size(); i++) { + std::cout << "Reader::readBatch 250" + << "\n"; std::vector nullVector(1); results[i].nullVector = std::make_shared>(nullVector); } - while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { - int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - totalRowsRead += rowsToRead; - ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - - int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); + while (readPart->totalRowsRead < readPart->totalRows && !checkEndOfRowGroup()) { + std::cout << "Reader::readBatch 255" + << "\n"; + if (readPart->totalRowsLoadedSoFar - readPart->totalRowsRead > 0) { + int rowsToRead = + doReadBatch(batchSize, *(readPart->buffersPtr), *(readPart->nullsPtr)); + readPart->totalRowsRead += rowsToRead; + readPart->rowsToRead = rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << readPart->totalRowsRead; + } + int rowsAfterFilter = doFilter(filterPart->rowsToRead, *(filterPart->buffersPtr), + *(filterPart->nullsPtr)); ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; - int tmp = - doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + int tmp = doAggregation(rowsAfterFilter, map, keys, results, + *(filterPart->buffersPtr), *(filterPart->nullsPtr)); // if the last batch are empty after filter, it will return 0 regard less of the // group num if (tmp != 0) rowsRet = tmp; } + if (readPart->totalRowsRead != 0){ + buffersPtr_ = filterPart->buffersPtr_; + nullsPtr_ = filterPart->nullsPtr_; + } int rowsDump = rowsRet; if (rowsRet > batchSize) { + std::cout << "Reader::readBatch 272" + << "\n"; rowsDump = batchSize; - dumpAggCursor = batchSize; + filterPart->dumpAggCursor = batchSize; } if (aggExprs.size()) { + std::cout << "Reader::readBatch 278" + << "\n"; dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, buffersPtr_, nullsPtr_, 0, rowsDump); } if (rowsRet <= batchSize) { // return all result in one call, so clear buffers here. + std::cout << "Reader::readBatch 284" + << "\n"; map.clear(); keys.clear(); results.clear(); @@ -223,45 +513,123 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr rowsRet = rowsDump; } else { // this row group aggregation result is more than default batch size, we // will return them via mutilple call - rowsRet = ((keys.size() - dumpAggCursor) > batchSize) + std::cout << "Reader::readBatch 292" + << "\n"; + rowsRet = ((keys.size() - filterPart->dumpAggCursor) > batchSize) ? batchSize - : ((keys.size() - dumpAggCursor)); + : ((keys.size() - filterPart->dumpAggCursor)); if (aggExprs.size()) { + std::cout << "Reader::readBatch 297" + << "\n"; dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, - buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); + buffersPtr_, nullsPtr_, filterPart->dumpAggCursor, rowsRet); } - if ((keys.size() - dumpAggCursor) <= + if ((keys.size() - readPart->dumpAggCursor) <= batchSize) { // the last batch, let's clear buffers + std::cout << "Reader::readBatch 303" + << "\n"; map.clear(); keys.clear(); results.clear(); - dumpAggCursor = 0; + readPart->dumpAggCursor = 0; } else { - dumpAggCursor += batchSize; + std::cout << "Reader::readBatch 309" + << "\n"; + readPart->dumpAggCursor += batchSize; } } } + std::cout << "Reader::readBatch 314" + << "\n"; + if (readPart->totalRowsRead != 0) { + + for (int i = 0; i < sizeof((filterPart->buffersPtr_)) / sizeof((filterPart->buffersPtr_)[0]); i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n";} + + for (int i = 0; i < sizeof((filterPart->buffersPtr_)) / sizeof((filterPart->buffersPtr_)[0]); i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: + delete ((bool*)(((filterPart->buffersPtr_))[i])); + break; + case parquet::Type::INT32: + delete ((int32_t*)(((filterPart->buffersPtr_))[i])); + break; + case parquet::Type::INT64: + delete ((int64_t*)(((filterPart->buffersPtr_))[i])); + case parquet::Type::INT96: + delete ((parquet::Int96*)(((filterPart->buffersPtr_))[i])); + break; + + case parquet::Type::FLOAT: + delete ((float*)(((filterPart->buffersPtr_))[i])); + break; + + case parquet::Type::DOUBLE: + delete ((double*)(((filterPart->buffersPtr_))[i])); + break; + case parquet::Type::BYTE_ARRAY: + delete ((parquet::ByteArray*)(((filterPart->buffersPtr_))[i])); + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + delete ((parquet::FixedLenByteArray*)(((filterPart->buffersPtr_))[i])); + break; + default: + ARROW_LOG(WARNING) << "Unsupported Type!"; + continue; + } + delete ((int64_t*)(((filterPart->nullsPtr_))[i])); + } + std::cout << "Reader::readBatch 494" + << "\n"; + delete [](filterPart->buffersPtr_); + delete [](filterPart->nullsPtr_); + } + std::cout << "Reader::readBatch 497" + << "\n"; + delete (filterPart->buffersPtr); + delete (filterPart->nullsPtr); + filterPart->buffersPtr_ = NULL; + filterPart->nullsPtr_ = NULL; + filterPart->buffersPtr = NULL; + filterPart->nullsPtr = NULL; + delete(filterPart); + filterPart = NULL; ARROW_LOG(DEBUG) << "ret rows " << rowsRet; + std::cout << "Reader::readBatch 507" + << "\n"; return rowsRet; } int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, std::vector& nullsPtr) { - int rowsToRead = std::min((int64_t)batchSize, totalRowsLoadedSoFar - totalRowsRead); + std::cout << "Reader::doReadBatch 324" + << "\n"; + int rowsToRead = std::min((int64_t)batchSize, + readPart->totalRowsLoadedSoFar - readPart->totalRowsRead); std::vector defLevel(rowsToRead); std::vector repLevel(rowsToRead); std::vector nullBitMap(rowsToRead); ARROW_LOG(DEBUG) << "will read " << rowsToRead << " rows"; for (int i = 0; i < columnReaders.size(); i++) { + std::cout << "Reader::doReadBatch 331" + << "\n"; int64_t levelsRead = 0, valuesRead = 0, nullCount = 0; int rows = 0; int tmpRows = 0; // ReadBatchSpaced API will return rows left in a data page while (rows < rowsToRead) { + std::cout << "Reader::doReadBatch 337" + << "\n"; + std::cout << rows << " " << rowsToRead << "\n"; // TODO: refactor. it's ugly, but didn't find some better way. switch (typeVector[i]) { + std::cout << "Reader::doReadBatch 340" + << "\n"; case parquet::Type::BOOLEAN: { + std::cout << "Reader::doReadBatch 342" + << "\n"; parquet::BoolReader* boolReader = static_cast(columnReaders[i].get()); tmpRows = boolReader->ReadBatchSpaced( @@ -272,6 +640,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, } case parquet::Type::INT32: { + std::cout << "Reader::doReadBatch 353" + << "\n"; parquet::Int32Reader* int32Reader = static_cast(columnReaders[i].get()); tmpRows = int32Reader->ReadBatchSpaced( @@ -281,6 +651,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::INT64: { + std::cout << "Reader::doReadBatch 363" + << "\n"; parquet::Int64Reader* int64Reader = static_cast(columnReaders[i].get()); tmpRows = int64Reader->ReadBatchSpaced( @@ -290,6 +662,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::INT96: { + std::cout << "Reader::doReadBatch 373" + << "\n"; parquet::Int96Reader* int96Reader = static_cast(columnReaders[i].get()); tmpRows = int96Reader->ReadBatchSpaced( @@ -299,6 +673,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::FLOAT: { + std::cout << "Reader::doReadBatch 383" + << "\n"; parquet::FloatReader* floatReader = static_cast(columnReaders[i].get()); tmpRows = floatReader->ReadBatchSpaced( @@ -308,6 +684,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::DOUBLE: { + std::cout << "Reader::doReadBatch 393" + << "\n"; parquet::DoubleReader* doubleReader = static_cast(columnReaders[i].get()); tmpRows = doubleReader->ReadBatchSpaced( @@ -317,6 +695,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::BYTE_ARRAY: { + std::cout << "Reader::doReadBatch 403" + << "\n"; parquet::ByteArrayReader* byteArrayReader = static_cast(columnReaders[i].get()); tmpRows = byteArrayReader->ReadBatchSpaced( @@ -346,6 +726,8 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: { + std::cout << "Reader::doReadBatch 433" + << "\n"; parquet::FixedLenByteArrayReader* fixedLenByteArrayReader = static_cast(columnReaders[i].get()); tmpRows = fixedLenByteArrayReader->ReadBatchSpaced( @@ -359,22 +741,35 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, break; } convertBitMap(nullBitMap.data(), (uint8_t*)nullsPtr[i] + rows, tmpRows); + std::cout<<"have read rows = "<& buffersPtr, std::vector& nullsPtr) { + std::cout << "Reader::doFilter 457" + << "\n"; if (filterExpression) { + std::cout << "Reader::doFilter 459" + << "\n"; auto start = std::chrono::steady_clock::now(); std::vector tmp(0); int rowsRet = filterExpression->ExecuteWithParam(batchSize, buffersPtr, nullsPtr, tmp); filterTime += std::chrono::steady_clock::now() - start; + std::cout << "Reader::doFilter 507" + << "\n"; return rowsRet; } return batchSize; @@ -384,9 +779,13 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector& keys std::vector& results, std::vector& buffersPtr, std::vector& nullsPtr) { + std::cout << "Reader::doAggregation 457" + << "\n"; int rowsRet = batchSize; if (batchSize > 0 && aggExprs.size()) { // if rows after filter is 0, no need to do agg. + std::cout << "Reader::doAggregation 478" + << "\n"; auto start = std::chrono::steady_clock::now(); int groupBySize = groupByExprs.size(); ARROW_LOG(DEBUG) << "group by size " << groupBySize; @@ -394,13 +793,19 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector& keys // build hash map and index if (groupBySize > 0) { + std::cout << "Reader::doAggregation 486" + << "\n"; GroupByUtils::groupBy(map, indexes, batchSize, groupByExprs, buffersPtr, nullsPtr, keys, typeVector); } for (int i = 0; i < aggExprs.size(); i++) { + std::cout << "Reader::doAggregation 492" + << "\n"; auto agg = aggExprs[i]; if (typeid(*agg) == typeid(RootAggExpression)) { + std::cout << "Reader::doAggregation 495" + << "\n"; std::vector tmp(0); agg->ExecuteWithParam(batchSize, buffersPtr, nullsPtr, tmp); if (groupBySize) { // do agg based on indexes @@ -410,13 +815,19 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector& keys std::dynamic_pointer_cast(agg)->getResult(results[i]); } } else { + std::cout << "Reader::doAggregation 505" + << "\n"; ARROW_LOG(DEBUG) << "skipping groupBy column when doing aggregation"; } } for (int i = 0; i < aggExprs.size(); i++) { + std::cout << "Reader::doAggregation 511" + << "\n"; auto agg = aggExprs[i]; if (typeid(*agg) == typeid(RootAggExpression)) { + std::cout << "Reader::doAggregation 514" + << "\n"; std::dynamic_pointer_cast(agg)->reset(); } } @@ -453,47 +864,66 @@ int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr, std::vector& nullsPtr) { + std::cout << "Reader::allocateExtraBuffers 575" + << "\n"; if (filterExpression) { + std::cout << "Reader::allocateExtraBuffers 577" + << "\n"; allocateFilterBuffers(batchSize); } if (aggExprs.size()) { // todo: group by agg size + std::cout << "Reader::allocateExtraBuffers 582" + << "\n"; allocateAggBuffers(batchSize); } - + std::cout << "Reader::allocateExtraBuffers 585" + << "\n"; int filterBufferCount = filterDataBuffers.size(); int aggBufferCount = aggDataBuffers.size(); if (filterBufferCount > 0 || aggBufferCount > 0) { ARROW_LOG(DEBUG) << "use extra filter buffers count: " << filterBufferCount << "use extra agg buffers count: " << aggBufferCount; + std::cout << "Reader::allocateExtraBuffers 592" + << "\n"; - buffersPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); - nullsPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + buffersPtr.resize(readPart->initRequiredColumnCount + filterBufferCount + + aggBufferCount); + nullsPtr.resize(readPart->initRequiredColumnCount + filterBufferCount + + aggBufferCount); for (int i = 0; i < filterBufferCount; i++) { - buffersPtr[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; - nullsPtr[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; + std::cout << "Reader::allocateExtraBuffers 598" + << "\n"; + buffersPtr[readPart->initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; + nullsPtr[readPart->initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; } for (int i = 0; i < aggBufferCount; i++) { - buffersPtr[initRequiredColumnCount + filterBufferCount + i] = + std::cout << "Reader::allocateExtraBuffers 604" + << "\n"; + buffersPtr[readPart->initRequiredColumnCount + filterBufferCount + i] = (int64_t)aggDataBuffers[i]; - nullsPtr[initRequiredColumnCount + filterBufferCount + i] = + nullsPtr[readPart->initRequiredColumnCount + filterBufferCount + i] = (int64_t)aggNullBuffers[i]; } } - return initRequiredColumnCount + filterBufferCount + aggBufferCount; + std::cout << "Reader::allocateExtraBuffers 611" + << "\n"; + return readPart->initRequiredColumnCount + filterBufferCount + aggBufferCount; } -bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); } +bool Reader::hasNext() { + return readPart->dumpAggCursor > 0 || columnReaders[0]->HasNext(); +} bool Reader::skipNextRowGroup() { - if (totalRowGroupsRead == totalRowGroups) { + if (readPart->totalRowGroupsRead == readPart->totalRowGroups) { return false; } - currentRowGroup++; - totalRowGroupsRead++; + readPart->currentRowGroup++; + readPart->totalRowGroupsRead++; return true; } @@ -521,17 +951,19 @@ void Reader::close() { } void Reader::preBufferRowGroups() { - if (!preBufferEnabled || currentBufferedRowGroup >= currentRowGroup) { + if (!preBufferEnabled || + readPart->currentBufferedRowGroup >= readPart->currentRowGroup) { return; } int maxBufferCount = 100; // TODO std::vector rowGroups; std::vector columns = requiredColumnIndex; - int maxRowGroupIndex = firstRowGroupIndex + totalRowGroups - 1; - for (int i = 0; i < maxBufferCount && currentBufferedRowGroup < maxRowGroupIndex; i++) { - currentBufferedRowGroup = currentRowGroup + i; - rowGroups.push_back(currentBufferedRowGroup); + int maxRowGroupIndex = readPart->firstRowGroupIndex + readPart->totalRowGroups - 1; + for (int i = 0; + i < maxBufferCount && readPart->currentBufferedRowGroup < maxRowGroupIndex; i++) { + readPart->currentBufferedRowGroup = readPart->currentRowGroup + i; + rowGroups.push_back(readPart->currentBufferedRowGroup); } ::arrow::io::AsyncContext ctx; @@ -550,23 +982,26 @@ void Reader::initRowGroupReaders() { return; } - rowGroupReaders.resize(totalRowGroups); - for (int i = 0; i < totalRowGroups; i++) { - rowGroupReaders[i] = parquetReader->RowGroup(firstRowGroupIndex + i); - totalRows += rowGroupReaders[i]->metadata()->num_rows(); + rowGroupReaders.resize(readPart->totalRowGroups); + for (int i = 0; i < readPart->totalRowGroups; i++) { + rowGroupReaders[i] = parquetReader->RowGroup(readPart->firstRowGroupIndex + i); + readPart->totalRows += rowGroupReaders[i]->metadata()->num_rows(); ARROW_LOG(DEBUG) << "this rg have rows: " << rowGroupReaders[i]->metadata()->num_rows(); } } bool Reader::checkEndOfRowGroup() { - if (totalRowsRead != totalRowsLoadedSoFar || dumpAggCursor != 0) return false; + if (readPart->totalRowsRead != readPart->totalRowsLoadedSoFar || + readPart->dumpAggCursor != 0) + return false; // if a splitFile contains rowGroup [2,5], currentRowGroup is 2 // rowGroupReaders index starts from 0 - ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar; - rowGroupReader = rowGroupReaders[currentRowGroup - firstRowGroupIndex]; - currentRowGroup++; - totalRowGroupsRead++; + ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << readPart->totalRowsLoadedSoFar; + rowGroupReader = + rowGroupReaders[readPart->currentRowGroup - readPart->firstRowGroupIndex]; + readPart->currentRowGroup++; + readPart->totalRowGroupsRead++; // Do not release CacheManager's objects when going to next row group. // release() may free all useful buffers loaded by preBufferRowGroups. @@ -582,7 +1017,7 @@ bool Reader::checkEndOfRowGroup() { } } - totalRowsLoadedSoFar += rowGroupReader->metadata()->num_rows(); + readPart->totalRowsLoadedSoFar += rowGroupReader->metadata()->num_rows(); return true; } @@ -598,10 +1033,10 @@ void Reader::setFilter(std::string filterJsonStr) { setFilterColumnNames(tmpExpression); // reset required columns to initial size - requiredColumnIndex.resize(initRequiredColumnCount); - requiredColumnNames.resize(initRequiredColumnCount); - schema->erase(schema->begin() + initRequiredColumnCount, schema->end()); - columnReaders.resize(initRequiredColumnCount); + requiredColumnIndex.resize(readPart->initRequiredColumnCount); + requiredColumnNames.resize(readPart->initRequiredColumnCount); + schema->erase(schema->begin() + readPart->initRequiredColumnCount, schema->end()); + columnReaders.resize(readPart->initRequiredColumnCount); // Check with filtered column names. Append column if not present in the initial // required columns. @@ -623,21 +1058,38 @@ void Reader::setFilter(std::string filterJsonStr) { filterExpression->setSchema(schema); filterReset = true; - initPlusFilterRequiredColumnCount = requiredColumnIndex.size(); + readPart->initPlusFilterRequiredColumnCount = requiredColumnIndex.size(); + std::cout << "Reader::setFilter 646 initRequiredColumnCount" + << readPart->initRequiredColumnCount << " initPlusFilterRequiredColumnCount" + << readPart->initPlusFilterRequiredColumnCount << "\n"; } int Reader::allocateFilterBuffers(int batchSize) { - if (!filterReset && batchSize <= currentBatchSize) { + std::cout << "Reader::allocateFilterBuffers 757" + << "\n"; + if (!filterReset && batchSize <= readPart->currentBatchSize) { + std::cout << "Reader::allocateFilterBuffers 758" + << "\n"; return 0; } filterReset = false; + std::cout << "Reader::allocateFilterBuffers 762" + << "\n"; // free current filter buffers freeFilterBuffers(); + std::cout << "Reader::allocateFilterBuffers 766" + << "\n"; // allocate new filter buffers int extraBufferNum = 0; - for (int i = initRequiredColumnCount; i < initPlusFilterRequiredColumnCount; i++) { + std::cout << "Reader::allocateFilterBuffers 772 initRequiredColumnCount" + << readPart->initRequiredColumnCount << " initPlusFilterRequiredColumnCount" + << readPart->initPlusFilterRequiredColumnCount << "\n"; + for (int i = readPart->initRequiredColumnCount; + i < readPart->initPlusFilterRequiredColumnCount; i++) { + std::cout << "Reader::allocateFilterBuffers 771" + << "\n"; int columnIndex = requiredColumnIndex[i]; // allocate memory buffer char* dataBuffer; @@ -676,8 +1128,12 @@ int Reader::allocateFilterBuffers(int batchSize) { filterNullBuffers.push_back(nullBuffer); extraBufferNum++; } + std::cout << "Reader::allocateFilterBuffers 810" + << "\n"; ARROW_LOG(INFO) << "create extra filter buffers count: " << extraBufferNum; + std::cout << "Reader::allocateFilterBuffers 813" + << "\n"; return extraBufferNum; } @@ -722,17 +1178,26 @@ void Reader::setFilterColumnNames(std::shared_ptr filter) { } int Reader::allocateAggBuffers(int batchSize) { - if (!aggReset && batchSize <= currentBatchSize) { + std::cout << "Reader::allocateAggBuffers 857" + << "\n"; + if (!aggReset && batchSize <= readPart->currentBatchSize) { + std::cout << "Reader::allocateAggBuffers 859" + << "\n"; return 0; } aggReset = false; // free current agg buffers freeAggBuffers(); + std::cout << "Reader::allocateAggBuffers 866" + << "\n"; // allocate new agg buffers int extraBufferNum = 0; - for (int i = initPlusFilterRequiredColumnCount; i < requiredColumnIndex.size(); i++) { + for (int i = readPart->initPlusFilterRequiredColumnCount; + i < requiredColumnIndex.size(); i++) { + std::cout << "Reader::allocateAggBuffers 871" + << "\n"; int columnIndex = requiredColumnIndex[i]; // allocate memory buffer char* dataBuffer; @@ -771,6 +1236,8 @@ int Reader::allocateAggBuffers(int batchSize) { aggNullBuffers.push_back(nullBuffer); extraBufferNum++; } + std::cout << "Reader::allocateAggBuffers 910" + << "\n"; ARROW_LOG(INFO) << "create extra agg buffers count: " << extraBufferNum; return extraBufferNum; @@ -819,10 +1286,11 @@ void Reader::setAgg(std::string aggStr) { } // reset required columns to initial size - requiredColumnIndex.resize(initPlusFilterRequiredColumnCount); - requiredColumnNames.resize(initPlusFilterRequiredColumnCount); - schema->erase(schema->begin() + initPlusFilterRequiredColumnCount, schema->end()); - columnReaders.resize(initPlusFilterRequiredColumnCount); + requiredColumnIndex.resize(readPart->initPlusFilterRequiredColumnCount); + requiredColumnNames.resize(readPart->initPlusFilterRequiredColumnCount); + schema->erase(schema->begin() + readPart->initPlusFilterRequiredColumnCount, + schema->end()); + columnReaders.resize(readPart->initPlusFilterRequiredColumnCount); // Check with agg column names. Append column if not present in the initial required // columns. diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 17930ac86..bb2d1768d 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -37,6 +37,30 @@ #include "src/utils/GroupByUtils.h" #include "src/utils/DumpUtils.h" +struct readReady +{ + std::vector* buffersPtr; + int64_t* buffersPtr_; + std::vector* nullsPtr; + int64_t* nullsPtr_; + int totalRowGroups ; + int totalRowGroupsRead ; + int totalColumns ; + int64_t totalRows ; + int firstRowGroupIndex ; + int currentRowGroup ; + int64_t totalRowsRead ; + int64_t totalRowsLoadedSoFar ; + int rowsToRead; + + int currentBufferedRowGroup ; + + int currentBatchSize = 0; + int initRequiredColumnCount = 0; + int initPlusFilterRequiredColumnCount = 0; + int32_t dumpAggCursor = 0; +}; + namespace ape { class Reader { public: @@ -99,6 +123,7 @@ class Reader { int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, int64_t* oriBufferPtr, int64_t* oriNullsPtr, int32_t offset, int32_t length); + //void copyStruct(struct readReady *readPart, struct readReady *filterPart); arrow::Result> fsResult; arrow::fs::HdfsOptions* options; @@ -116,15 +141,9 @@ class Reader { std::vector> columnReaders; std::vector requiredRowGroupId; - int totalRowGroups = 0; - int totalRowGroupsRead = 0; - int totalColumns = 0; - int64_t totalRows = 0; - int firstRowGroupIndex = 0; + struct readReady* readPart; + - int currentRowGroup = 0; - int64_t totalRowsRead = 0; - int64_t totalRowsLoadedSoFar = 0; std::shared_ptr filterExpression; std::chrono::duration filterTime = std::chrono::nanoseconds::zero(); @@ -133,13 +152,10 @@ class Reader { std::vector extraByteArrayBuffers; bool filterReset = false; - int currentBatchSize = 0; - int initRequiredColumnCount = 0; std::vector filterColumnNames; std::vector filterDataBuffers; std::vector filterNullBuffers; - int initPlusFilterRequiredColumnCount = 0; bool aggReset = false; std::vector aggColumnNames; std::vector aggDataBuffers; @@ -154,7 +170,7 @@ class Reader { std::shared_ptr redisConnectionOptions; bool preBufferEnabled = false; - int currentBufferedRowGroup = -1; + std::vector usedInitBufferIndex; std::vector typeVector = std::vector(); @@ -163,6 +179,6 @@ class Reader { std::vector keys = std::vector(); ApeHashMap map; - int32_t dumpAggCursor = 0; + }; } // namespace ape