Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-692968: Async queries support #787

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
763153f
Add async query support
sfc-gh-ext-simba-nl Nov 28, 2024
974613b
Add async test file
sfc-gh-ext-simba-nl Nov 28, 2024
a2f2701
Fix linux build by including unistd.h
sfc-gh-ext-simba-nl Nov 28, 2024
3829b60
Remove unnecessary test
sfc-gh-ext-simba-nl Nov 28, 2024
d947159
Add query status to C API, refactor some query parameters, move getti…
sfc-gh-ext-simba-nl Dec 3, 2024
8ab3758
Fix typo
sfc-gh-ext-simba-nl Dec 3, 2024
43c5653
Fix typo
sfc-gh-ext-simba-nl Dec 3, 2024
76ac1f1
Add more test cases, fix async fetching bugs
sfc-gh-ext-simba-nl Dec 4, 2024
c4fa76d
Fix bug with normal queries that go async after a while
sfc-gh-ext-simba-nl Dec 4, 2024
387a59f
fix typo
sfc-gh-ext-simba-nl Dec 4, 2024
cb067ec
Remove status check from fake table
sfc-gh-ext-simba-nl Dec 4, 2024
6cd5c78
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 10, 2024
582913e
Fix linux warnings
sfc-gh-ext-simba-nl Dec 10, 2024
98cb876
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-jszczerbinski Dec 12, 2024
d3b94a0
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 12, 2024
f84fc25
Improve error handling and logging
sfc-gh-ext-simba-nl Dec 12, 2024
97531c8
Merge branch 'SNOW-692968-async-queries-support' of https://github.co…
sfc-gh-ext-simba-nl Dec 12, 2024
f32ada0
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 19, 2024
942ae0a
Fix memory issues in test cases
sfc-gh-ext-simba-nl Dec 19, 2024
ed33af6
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 20, 2024
75e6565
organize enums, add test
sfc-gh-ext-simba-nl Dec 21, 2024
9e457ae
Lower the rowcount for the test
sfc-gh-ext-simba-nl Dec 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions include/snowflake/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,25 @@ typedef enum SF_STMT_ATTRIBUTE {
SF_STMT_USER_REALLOC_FUNC
} SF_STMT_ATTRIBUTE;

/**
* The query status
*/
typedef enum SF_QUERY_STATUS {
SF_QUERY_STATUS_RUNNING,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's sort statuses alphabetically

SF_QUERY_STATUS_ABORTING,
SF_QUERY_STATUS_SUCCESS,
SF_QUERY_STATUS_FAILED_WITH_ERROR,
SF_QUERY_STATUS_ABORTED,
SF_QUERY_STATUS_QUEUED,
SF_QUERY_STATUS_FAILED_WITH_INCIDENT,
SF_QUERY_STATUS_DISCONNECTED,
SF_QUERY_STATUS_RESUMING_WAREHOUSE,
SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE,
SF_QUERY_STATUS_RESTARTED,
SF_QUERY_STATUS_BLOCKED,
SF_QUERY_STATUS_NO_DATA
} SF_QUERY_STATUS;

/**
* Snowflake Error
*/
Expand Down Expand Up @@ -613,6 +632,16 @@ SF_STATUS STDCALL snowflake_get_attribute(
*/
SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf);

/**
* Creates sf SNOWFLAKE_STMT context for async queries.
*
* @param sf The SF_CONNECT context.
* @param query_id the query id of the async query.
*
* @return sfstmt SNOWFLAKE_STMT context for async queries.
*/
SF_STMT* STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id);
Copy link
Collaborator

@sfc-gh-ext-simba-hx sfc-gh-ext-simba-hx Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name sounds confusing since it seems to be an equivalent of snowflake_stmt. Maybe use the similar function name as JDBC, create_resultset or to be more specific, create_async_query_result? Still return a SF_STMT instance since libsfclient doesn't have an struct specifically for result set, but would be easier to understand the functionality of the function.


/**
* Frees the memory used by a SF_QUERY_RESULT_CAPTURE struct.
* Note that this only frees the struct itself, and *not* the underlying
Expand Down Expand Up @@ -775,6 +804,14 @@ snowflake_stmt_get_attr(SF_STMT *sfstmt, SF_STMT_ATTRIBUTE type, void **value);
*/
SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt);

/**
* Executes a statement asynchronously.
* @param sfstmt SNOWFLAKE_STMT context.
*
* @return 0 if success, otherwise an errno is returned.
*/
SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt);

/**
* Executes a statement with capture.
* @param sfstmt SNOWFLAKE_STMT context.
Expand Down
210 changes: 206 additions & 4 deletions lib/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <Shellapi.h>
#define strncasecmp _strnicmp
#define strcasecmp _stricmp
#else
#include <unistd.h>
#endif

#define curl_easier_escape(curl, string) curl_easy_escape(curl, string, 0)
Expand Down Expand Up @@ -54,12 +56,173 @@ static SF_STATUS STDCALL
_reset_connection_parameters(SF_CONNECT *sf, cJSON *parameters,
cJSON *session_info, sf_bool do_validate);

static const char* query_status_names[] = {
"RUNNING",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the statuses sorted?

"ABORTING",
"SUCCESS",
"FAILED_WITH_ERROR",
"ABORTED",
"QUEUED",
"FAILED_WITH_INCIDENT",
"DISCONNECTED",
"RESUMING_WAREHOUSE",
"QUEUED_REPAIRING_WAREHOUSE",
"RESTARTED",
"BLOCKED",
"NO_DATA"
};

/**
* Validate partner application name.
* @param application partner application name
*/
sf_bool validate_application(const char *application);

/**
* Helper function to get SF_QUERY_STATUS given the string representation
* @param query_status the string representation of the query status
*/
SF_QUERY_STATUS get_status_from_string(const char *query_status) {
if (query_status == NULL) {
return SF_QUERY_STATUS_NO_DATA;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it no data or more UNKNOWN?

}
int idx = 0, last = 0;
for (idx = 0, last = (int)SF_QUERY_STATUS_NO_DATA; idx <= last; ++idx) {
size_t len = strlen(query_status_names[idx]);
if (sf_strncasecmp(query_status_names[idx], query_status, len) == 0) {
return (SF_QUERY_STATUS)idx;
}
}
return SF_QUERY_STATUS_NO_DATA;
}

/**
* Get the metadata of the query
* @param sf the SF_CONNECT context
* @param query_id the query id
*/
char *get_query_metadata(SF_CONNECT *sf, const char *query_id) {
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved
cJSON *resp = NULL;
cJSON *data = NULL;
cJSON *queries = NULL;
char *s_resp = NULL;
const char *error_msg;
size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(query_id) + 1;
char *status_query = (char*)SF_CALLOC(1, url_size);
sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, query_id);

if (request(sf, &resp, status_query, NULL, 0, NULL, NULL,
GET_REQUEST_TYPE, &sf->error, SF_BOOLEAN_TRUE,
0, sf->retry_count, get_retry_timeout(sf),
NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {

s_resp = snowflake_cJSON_Print(resp);
log_info("Here is JSON response:\n%s", s_resp);
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved

data = snowflake_cJSON_GetObjectItem(resp, "data");

queries = snowflake_cJSON_GetObjectItem(data, "queries");
cJSON* query = snowflake_cJSON_GetArrayItem(queries, 0);

char *metadata = snowflake_cJSON_Print(query);
snowflake_cJSON_Delete(resp);
SF_FREE(s_resp);
SF_FREE(status_query);
return metadata;
}
SF_FREE(status_query);
log_trace("Error getting query metadata.");
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved
return NULL;
}

/**
* Get the status of the query
* @param sf the SF_CONNECT context
* @param query_id the query id
*/
SF_QUERY_STATUS get_query_status(SF_CONNECT *sf, const char *query_id) {
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved
SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA;
char *metadata = get_query_metadata(sf, query_id);
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved
if (metadata) {
cJSON* metadataJson = snowflake_cJSON_Parse(metadata);

cJSON* status = snowflake_cJSON_GetObjectItem(metadataJson, "status");
if (snowflake_cJSON_IsString(status))
{
char* queryStatus = snowflake_cJSON_GetStringValue(status);
ret = get_status_from_string(queryStatus);
}
snowflake_cJSON_Delete(metadataJson);
}

return ret;
}

/**
* Helper function to determine if the query is still running
* @param query_status the query status
*/
sf_bool is_query_still_running(SF_QUERY_STATUS query_status) {
return (query_status == SF_QUERY_STATUS_RUNNING) ||
(query_status == SF_QUERY_STATUS_QUEUED) ||
(query_status == SF_QUERY_STATUS_RESUMING_WAREHOUSE) ||
(query_status == SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE) ||
(query_status == SF_QUERY_STATUS_NO_DATA);
}

/**
* Get the results of the async query
* @param sfstmt The SF_STMT context
*/
void get_real_results(SF_STMT * sfstmt) {
SF_QUERY_STATUS query_status = get_query_status(sfstmt->connection, sfstmt->sfqid);
int retry = 0;
int no_data_retry = 0;
int no_data_max_retries = 30;
int retry_pattern[] = {1, 1, 2, 3, 4, 8, 10};
int max_retries = 7;
while (query_status != SF_QUERY_STATUS_SUCCESS) {
if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) {
log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]);
return;
}
if (query_status == SF_QUERY_STATUS_NO_DATA) {
no_data_retry++;
if (no_data_retry >= no_data_max_retries) {
log_error(
"Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", sfstmt->sfqid);
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_GENERAL,
"Cannot retrieve data on the status of this query.",
NULL,
sfstmt->sfqid);
return;
}
}
}
int sleep_time = retry_pattern[retry] * 500;
#ifdef _WIN32
Sleep(sleep_time);
#else
usleep(sleep_time * 1000);
#endif
if (retry < max_retries) {
retry++;
} else {
log_error(
"Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid);
}
query_status = get_query_status(sfstmt->connection, sfstmt->sfqid);

char query[1024];
char* query_template = "select * from table(result_scan('%s'))";
sf_sprintf(query, strlen(query_template) - 2 + strlen(sfstmt->sfqid) + 1, query_template, sfstmt->sfqid);
SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query));
if (ret != SF_STATUS_SUCCESS) {
snowflake_propagate_error(sfstmt->connection, sfstmt);
}
}

#define _SF_STMT_TYPE_DML 0x3000
#define _SF_STMT_TYPE_INSERT (_SF_STMT_TYPE_DML + 0x100)
#define _SF_STMT_TYPE_UPDATE (_SF_STMT_TYPE_DML + 0x200)
Expand Down Expand Up @@ -1576,6 +1739,33 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) {
return sfstmt;
}

SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) {
if (!sf) {
return NULL;
}

SF_STMT *sfstmt = (SF_STMT *)SF_CALLOC(1, sizeof(SF_STMT));
if (sfstmt) {
_snowflake_stmt_reset(sfstmt);
sfstmt->connection = sf;
sf_strcpy(sfstmt->sfqid, SF_UUID4_LEN, query_id);
}

get_real_results(sfstmt);
sfc-gh-ext-simba-hx marked this conversation as resolved.
Show resolved Hide resolved

char *metadata_str = get_query_metadata(sfstmt->connection, query_id);
if (metadata_str) {
cJSON* metadata = snowflake_cJSON_Parse(metadata_str);
cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats");
if (snowflake_cJSON_IsObject(stats)) {
_snowflake_stmt_row_metadata_reset(sfstmt);
sfstmt->stats = set_stats(stats);
}
}

return sfstmt;
}

/**
* Initializes an SF_QUERY_RESPONSE_CAPTURE struct.
* Note that these need to be released by calling snowflake_query_result_capture_term().
Expand Down Expand Up @@ -1954,21 +2144,26 @@ snowflake_prepare(SF_STMT *sfstmt, const char *command, size_t command_size) {

SF_STATUS STDCALL snowflake_describe_with_capture(SF_STMT *sfstmt,
SF_QUERY_RESULT_CAPTURE *result_capture) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE);
}

SF_STATUS STDCALL snowflake_execute_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
sf_bool is_put_get_command,
SF_QUERY_RESULT_CAPTURE* result_capture,
sf_bool is_describe_only) {
sf_bool is_describe_only,
sf_bool is_async_exec) {
if (!sfstmt) {
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST;
}
Expand Down Expand Up @@ -2073,6 +2268,13 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
body = create_query_json_body(sfstmt->sql_text, sfstmt->sequence_counter,
is_string_empty(sfstmt->connection->directURL) ?
NULL : sfstmt->request_id, is_describe_only);

if (is_async_exec) {
snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_TRUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snowflake_cJSON_AddBoolToObject(body, "asyncExec", is_async_exec); ?

} else {
snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE);
}

if (bindings != NULL) {
/* binding parameters if exists */
snowflake_cJSON_AddItemToObject(body, "bindings", bindings);
Expand Down
7 changes: 5 additions & 2 deletions lib/client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#define QUERY_URL "/queries/v1/query-request"
#define RENEW_SESSION_URL "/session/token-request"
#define DELETE_SESSION_URL "/session"
#define QUERY_MONITOR_URL "/monitoring/queries/%s"
// not used for now but add for URL checking on connection requests
#define AUTHENTICATOR_URL "/session/authenticator-request"

Expand Down Expand Up @@ -141,15 +142,17 @@ SF_PUT_GET_RESPONSE *STDCALL sf_put_get_response_allocate();
* @param sfstmt SNOWFLAKE_STMT context.
* @param sf_use_application_json_accept type true if this is a put/get command
* @param raw_response_buffer optional pointer to an SF_QUERY_RESULT_CAPTURE,
* @param is_describe_only should the statement be executed in describe only mode
* if the query response is to be captured.
* @param is_describe_only should the statement be executed in describe only mode
* @param is_async_exec should it execute asynchronously
*
* @return 0 if success, otherwise an errno is returned.
*/
SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
sf_bool use_application_json_accept_type,
struct SF_QUERY_RESULT_CAPTURE* result_capture,
sf_bool is_describe_only);
sf_bool is_describe_only,
sf_bool is_async_exec);

/**
* @return true if this is a put/get command, otherwise false
Expand Down
1 change: 0 additions & 1 deletion lib/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ cJSON *STDCALL create_query_json_body(const char *sql_text, int64 sequence_id, c
#endif
body = snowflake_cJSON_CreateObject();
snowflake_cJSON_AddStringToObject(body, "sqlText", sql_text);
snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE);
snowflake_cJSON_AddNumberToObject(body, "sequenceId", (double) sequence_id);
snowflake_cJSON_AddNumberToObject(body, "querySubmissionTime", submission_time);
snowflake_cJSON_AddBoolToObject(body, "describeOnly", is_describe_only);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ SET(TESTS_C
test_get_describe_only_query_result
test_stmt_functions
test_unit_oauth
test_async
test_unit_mfa_auth
test_ocsp_fail_open
# FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY is internal switch
Expand Down
Loading