Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snow 855088 Set row mode array to mapping result set as array. Add suffix for duplicated columns during mapping to js objects. #591

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lib/connection/connection_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const NativeTypes = require('./result/data_types').NativeTypes;
const GlobalConfig = require('../global_config');
const authenticationTypes = require('../authentication/authentication').authenticationTypes;
const stringSimilarity = require("string-similarity");
const RowMode = require('./../constants/row_mode');
const WAIT_FOR_BROWSER_ACTION_TIMEOUT = 120000;
const DEFAULT_PARAMS =
[
Expand All @@ -33,6 +34,7 @@ const DEFAULT_PARAMS =
'database',
'schema',
'role',
'rowMode',
'streamResult',
'fetchAsString',
'clientSessionKeepAlive',
Expand Down Expand Up @@ -363,6 +365,11 @@ function ConnectionConfig(options, validateCredentials, qaMode, clientInfo)
ErrorCodes.ERR_CONN_CREATE_INVALID_FETCH_AS_STRING_VALUES,
JSON.stringify(fetchAsString[invalidValueIndex]));
}
// Row mode is optional, can be undefined
const rowMode = options.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
}

// check for invalid clientSessionKeepAlive
var clientSessionKeepAlive = options.clientSessionKeepAlive;
Expand Down Expand Up @@ -621,6 +628,15 @@ function ConnectionConfig(options, validateCredentials, qaMode, clientInfo)
return fetchAsString;
};

/**
* Returns the rowMode string value ('array', 'object' or 'object_with_renamed_duplicated_columns'). Could be null or undefined.
*
* @returns {String}
*/
this.getRowMode = function () {
return rowMode ;
};

/**
* Returns the client type.
*
Expand Down
2 changes: 1 addition & 1 deletion lib/connection/result/column.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var NULL_UPPERCASE = 'NULL';
*/
function Column(options, index, statementParameters, resultVersion)
{
var name = options.name;
var name = options.overriddenName || options.name;
var nullable = options.nullable;
var scale = options.scale;
var type = options.type;
Expand Down
22 changes: 13 additions & 9 deletions lib/connection/result/result.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ var Chunk = require('./chunk');
var ResultStream = require('./result_stream');
var ChunkCache = require('./chunk_cache');
var Column = require('./column');
var Parameters = require('../../parameters');
var StatementType = require('./statement_type');
const ColumnNamesCreator = require('./unique_column_name_creator');
const RowMode = require('../../constants/row_mode');

/**
* Creates a new Result.
*
* @param {Object} options
* @constructor
*/
function Result(options)
{
function Result(options) {
var data;
var chunkHeaders;
var parametersMap;
Expand Down Expand Up @@ -56,8 +56,7 @@ function Result(options)
// if no chunk headers were specified, but a query-result-master-key (qrmk)
// was specified, build the chunk headers from the qrmk
chunkHeaders = data.chunkHeaders;
if (!Util.isObject(chunkHeaders) && Util.isString(data.qrmk))
{
if (!Util.isObject(chunkHeaders) && Util.isString(data.qrmk)) {
chunkHeaders =
{
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
Expand All @@ -74,8 +73,7 @@ function Result(options)
// convert the parameters array to a map
parametersMap = {};
parametersArray = data.parameters;
for (index = 0, length = parametersArray.length; index < length; index++)
{
for (index = 0, length = parametersArray.length; index < length; index++) {
parameter = parametersArray[index];
parametersMap[parameter.name] = parameter.value;
}
Expand All @@ -95,8 +93,14 @@ function Result(options)
// index map in which the keys are the column names and the values are the
// indices of the columns with the corresponding names
this._mapColumnNameToIndices = mapColumnNameToIndices = {};
for (index = 0; index < numColumns; index++)
{

const rowMode = options.rowMode;
if (rowMode === RowMode.OBJECT_WITH_RENAMED_DUPLICATED_COLUMNS) {
ColumnNamesCreator.addOverridenNamesForDuplicatedColumns(rowtype);
}

for (let index = 0; index < numColumns; index++) {

// create a new column and add it to the columns array
columns[index] = column =
new Column(rowtype[index], index, parametersMap, version);
Expand Down
56 changes: 30 additions & 26 deletions lib/connection/result/row_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
* Copyright (c) 2015-2019 Snowflake Computing Inc. All rights reserved.
*/

var Readable = require('stream').Readable;
var Util = require('../../util');
var Errors = require('../../errors');
var ResultStream = require('./result_stream');
var DataTypes = require('./data_types');
const Readable = require('stream').Readable;
const Util = require('../../util');
const Errors = require('../../errors');
const ResultStream = require('./result_stream');
const DataTypes = require('./data_types');
const RowMode = require('./../../constants/row_mode');

/**
* Creates a stream that can be used to read a statement result row by row.
Expand All @@ -30,7 +31,7 @@ function RowStream(statement, context, options)
});

// extract streaming options
var start, end, fetchAsString;
let start, end, fetchAsString, rowMode;
if (Util.isObject(options))
{
start = options.start;
Expand All @@ -40,22 +41,23 @@ function RowStream(statement, context, options)

// if a fetchAsString value is not specified in the stream options, try the
// statement and connection options (in that order)
if (!Util.exists(fetchAsString))
{
if (!Util.exists(fetchAsString)) {
fetchAsString = context.fetchAsString;
}
if (!Util.exists(fetchAsString))
{
if (!Util.exists(fetchAsString)) {
fetchAsString = context.connectionConfig.getFetchAsString();
}
if (!Util.exists(rowMode)) {
rowMode = context.rowMode || context.connectionConfig.getRowMode();
}

var resultStream = null, numResultStreamInterrupts = 0;
var rowBuffer = null, rowIndex = 0;
var columns, mapColumnIdToExtractFnName;
var initialized = false;
var previousChunk = null;
let resultStream = null, numResultStreamInterrupts = 0;
let rowBuffer = null, rowIndex = 0;
let columns, mapColumnIdToExtractFnName;
let initialized = false;
let previousChunk = null;

var self = this;
const self = this;

/**
* Reads the next row in the result.
Expand Down Expand Up @@ -185,7 +187,7 @@ function RowStream(statement, context, options)

// if buffer has reached the threshold based on the highWaterMark value then
// push() will return false and pause sending data to the buffer until the data is read from the buffer
if (!self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName)))
if (!self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName, rowMode)))
{
break;
}
Expand All @@ -202,7 +204,7 @@ function RowStream(statement, context, options)
else // No more rows left in the buffer
{
// Push the last row in the buffer
self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName));
self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName, rowMode));
}
});
};
Expand Down Expand Up @@ -428,17 +430,19 @@ function buildMapColumnExtractFnNames(columns, fetchAsString)
* @param {Object} row
* @param {Object[]} columns
* @param {Object} [mapColumnIdToExtractFnName]
* @param {String?} rowMode - string value ('array', 'object' or 'object_with_renamed_duplicated_columns'). Default is 'object' when parameter isn't set.
*
* @returns {Object}
*/
function externalizeRow(row, columns, mapColumnIdToExtractFnName)
{
var externalizedRow = {};
for (var index = 0, length = columns.length; index < length; index++)
{
var column = columns[index];
var extractFnName = mapColumnIdToExtractFnName[column.getId()];
externalizedRow[column.getName()] = row[extractFnName](column.getId());
function externalizeRow(row, columns, mapColumnIdToExtractFnName, rowMode) {
const isArrayRowMode = rowMode === RowMode.ARRAY;

const externalizedRow = isArrayRowMode ? [] : {};

for (let index = 0, length = columns.length; index < length; index++) {
const column = columns[index];
const extractFnName = mapColumnIdToExtractFnName[column.getId()];
externalizedRow[isArrayRowMode ? index : column.getName()] = row[extractFnName](column.getId());
}

return externalizedRow;
Expand Down
38 changes: 38 additions & 0 deletions lib/connection/result/unique_column_name_creator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2015-2023 Snowflake Computing Inc. All rights reserved.
*/

const resultContainsDuplicatedColumns = (rowtype) => {
const columnNames = rowtype.map(rt => rt.name);
return columnNames.length !== new Set(columnNames).size;
};

function addOverriddenNamesForDuplicatedColumns(rowtype) {

//Prepare renamed columns for duplicates if row mode was set to 'object_with_renamed_duplicated_columns'
if (resultContainsDuplicatedColumns(rowtype)) {

const columnNames = new Set(rowtype.map(el => el.name));
const quntityOfColumnNames = new Map();

for (let index = 0; index < rowtype.length; index++) {
const columnName = rowtype[index].name;
if (columnName) {
if (quntityOfColumnNames.has(columnName)) {
let times = quntityOfColumnNames.get(columnName) + 1;
let newColumnName = columnName + '_' + times;
while (columnNames.has(newColumnName)) {
times += 1;
newColumnName = columnName + '_' + times;
}
quntityOfColumnNames.set(columnName, times);
rowtype[index].overriddenName = newColumnName;
columnNames.add(newColumnName);
} else {
quntityOfColumnNames.set(columnName, 1);
}
}
}
}
}
exports.addOverridenNamesForDuplicatedColumns = addOverriddenNamesForDuplicatedColumns;
49 changes: 33 additions & 16 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
var NativeTypes = require('./result/data_types').NativeTypes;
var file_transfer_agent = require('.././file_transfer_agent/file_transfer_agent');
var Bind = require('./bind_uploader');
const RowMode = require('./../constants/row_mode');

var states =
{
Expand Down Expand Up @@ -187,6 +188,11 @@
JSON.stringify(fetchAsString[invalidValueIndex]));
}

const rowMode = statementOptions.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
}

// validate non-user-specified arguments
Errors.assertInternal(Util.isObject(services));
Errors.assertInternal(Util.isObject(connectionConfig));
Expand All @@ -200,6 +206,7 @@
statementContext.fetchAsString = statementOptions.fetchAsString;
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;

// set the statement type
statementContext.type = (statementContext.type == statementTypes.ROW_PRE_EXEC) ? statementTypes.ROW_POST_EXEC : statementTypes.FILE_POST_EXEC;
Expand Down Expand Up @@ -353,6 +360,10 @@
Errors.checkArgumentValid(Util.isBoolean(statementOptions.internal),
ErrorCodes.ERR_CONN_EXEC_STMT_INVALID_INTERNAL);
}
const rowMode = statementOptions.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
}

// create a statement context
var statementContext = createStatementContext();
Expand All @@ -363,6 +374,7 @@
statementContext.fetchAsString = statementOptions.fetchAsString;
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;

// if a binds array is specified, add it to the statement context
if (Util.exists(statementOptions.binds))
Expand Down Expand Up @@ -436,6 +448,7 @@
context.services = services;
context.connectionConfig = connectionConfig;
context.isFetchingResult = true;
context.rowMode = statementOptions.rowMode || connectionConfig.getRowMode();

// TODO: add the parameters map to the statement context

Expand Down Expand Up @@ -659,21 +672,16 @@
// find out if the result will be streamed;
// if a value is not specified, get it from the connection
var streamResult = context.streamResult;
if (!Util.exists(streamResult))
{
if (!Util.exists(streamResult)) {
streamResult = context.connectionConfig.getStreamResult();
}

// if the result will be streamed later,
// invoke the complete callback right away
if (streamResult)
{
if (streamResult) {
context.complete(Errors.externalize(context.resultError), statement);
}
else
{
process.nextTick(function ()
{
} else {
process.nextTick(function () {
// aggregate all the rows into an array and pass this
// array to the complete callback as the last argument
var rows = [];
Expand All @@ -683,17 +691,14 @@
let row;

// while there are rows available to read, push row to results array
while ((row = this.read()) !== null)
{
while ((row = this.read()) !== null) {
rows.push(row);
}
})
.on('end', function ()
{
.on('end', function () {
context.complete(null, statement, rows);
})
.on('error', function (err)
{
.on('error', function (err) {
context.complete(Errors.externalize(err), statement);
});
});
Expand Down Expand Up @@ -786,7 +791,8 @@
response: body,
statement: statement,
services: context.services,
connectionConfig: context.connectionConfig
connectionConfig: context.connectionConfig,
rowMode: context.rowMode
});
// save the statement id
context.statementId = context.result.getStatementId();
Expand Down Expand Up @@ -1051,6 +1057,11 @@
Errors.checkArgumentValid(Util.isFunction(options.end),
ErrorCodes.ERR_STMT_FETCH_ROWS_INVALID_END);

const rowMode = options.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
}

// if we're still trying to fetch the result, create an error of our own
// and invoke the end() callback
if (context.isFetchingResult)
Expand Down Expand Up @@ -1146,6 +1157,12 @@
ErrorCodes.ERR_STMT_STREAM_ROWS_INVALID_FETCH_AS_STRING_VALUES,
JSON.stringify(fetchAsString[invalidValueIndex]));
}

const rowMode = context.rowMode;
if (Util.exists(rowMode))
{
RowMode.checkRowModeValid(rowMode);

Check warning on line 1164 in lib/connection/statement.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/statement.js#L1164

Added line #L1164 was not covered by tests
}
}

return new RowStream(statement, context, options);
Expand Down
Loading
Loading