Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-692968: Async queries support #787

Merged
merged 62 commits into from
Mar 5, 2025
Merged
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
763153f
Add async query support
sfc-gh-ext-simba-nl Nov 28, 2024
974613b
Add async test file
sfc-gh-ext-simba-nl Nov 28, 2024
a2f2701
Fix linux build by including unistd.h
sfc-gh-ext-simba-nl Nov 28, 2024
3829b60
Remove unnecessary test
sfc-gh-ext-simba-nl Nov 28, 2024
d947159
Add query status to C API, refactor some query parameters, move getti…
sfc-gh-ext-simba-nl Dec 3, 2024
8ab3758
Fix typo
sfc-gh-ext-simba-nl Dec 3, 2024
43c5653
Fix typo
sfc-gh-ext-simba-nl Dec 3, 2024
76ac1f1
Add more test cases, fix async fetching bugs
sfc-gh-ext-simba-nl Dec 4, 2024
c4fa76d
Fix bug with normal queries that go async after a while
sfc-gh-ext-simba-nl Dec 4, 2024
387a59f
fix typo
sfc-gh-ext-simba-nl Dec 4, 2024
cb067ec
Remove status check from fake table
sfc-gh-ext-simba-nl Dec 4, 2024
6cd5c78
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 10, 2024
582913e
Fix linux warnings
sfc-gh-ext-simba-nl Dec 10, 2024
98cb876
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-jszczerbinski Dec 12, 2024
d3b94a0
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 12, 2024
f84fc25
Improve error handling and logging
sfc-gh-ext-simba-nl Dec 12, 2024
97531c8
Merge branch 'SNOW-692968-async-queries-support' of https://github.co…
sfc-gh-ext-simba-nl Dec 12, 2024
f32ada0
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 19, 2024
942ae0a
Fix memory issues in test cases
sfc-gh-ext-simba-nl Dec 19, 2024
ed33af6
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Dec 20, 2024
75e6565
organize enums, add test
sfc-gh-ext-simba-nl Dec 21, 2024
9e457ae
Lower the rowcount for the test
sfc-gh-ext-simba-nl Dec 21, 2024
2aaea2a
Have get_query_metadata return a struct instead of a string
sfc-gh-ext-simba-nl Jan 20, 2025
9003f03
merge mastre
sfc-gh-ext-simba-nl Jan 20, 2025
eb4ae08
Fix build issue with merge master
sfc-gh-ext-simba-nl Jan 20, 2025
a793549
Fix merge issue
sfc-gh-ext-simba-nl Jan 20, 2025
45fa5a3
Fix formatting
sfc-gh-ext-simba-nl Jan 20, 2025
734a064
remove sf_sleep_ms from platform header file
sfc-gh-ext-simba-nl Jan 22, 2025
67e6db3
Move sf_sleep_ms from platform to util
sfc-gh-ext-simba-nl Jan 22, 2025
7671e14
Fix linux compilation and remove extra newline in platform.h
sfc-gh-ext-simba-nl Jan 22, 2025
4efbd8b
Fix build errors
sfc-gh-ext-simba-nl Jan 22, 2025
04100c3
Add util.h to test file
sfc-gh-ext-simba-nl Jan 22, 2025
c89f086
Merge master
sfc-gh-ext-simba-nl Jan 29, 2025
8b39c03
Add missing files from merge
sfc-gh-ext-simba-nl Jan 29, 2025
c9d9bbb
Fix formatting, add test case for retries
sfc-gh-ext-simba-nl Jan 30, 2025
17d6236
Minor logic fix, uncomment test cases
sfc-gh-ext-simba-nl Jan 30, 2025
f298aa2
change get results logic to query /queries/{sfqid}/result instead of …
sfc-gh-ext-simba-nl Jan 31, 2025
73a1a98
merge master
sfc-gh-ext-simba-nl Jan 31, 2025
79a8df0
Fix async timeout test
sfc-gh-ext-simba-nl Feb 1, 2025
2f99bdd
Remove unused variables
sfc-gh-ext-simba-nl Feb 1, 2025
31ac782
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 3, 2025
ae46e5a
Forgot to close connection on test case
sfc-gh-ext-simba-nl Feb 4, 2025
fa9b3e0
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 4, 2025
32f127a
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 4, 2025
6891894
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 5, 2025
5788a0f
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 6, 2025
bde8ccc
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 10, 2025
2030329
Make less unnecessary calls to get query metadata, rename to make thi…
sfc-gh-ext-simba-nl Feb 11, 2025
d2371b7
Uncomment test cases
sfc-gh-ext-simba-nl Feb 11, 2025
5b1198f
Fix formatting
sfc-gh-ext-simba-nl Feb 11, 2025
a6c707c
Minor fix
sfc-gh-ext-simba-nl Feb 12, 2025
222b94b
Fix code quality warnings
sfc-gh-ext-simba-nl Feb 13, 2025
79e5e20
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-dprzybysz Feb 19, 2025
11b2c40
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 19, 2025
2313d73
Merge branch 'SNOW-692968-async-queries-support' of https://github.co…
sfc-gh-ext-simba-nl Feb 19, 2025
6b37e6a
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-ext-simba-nl Feb 20, 2025
e86aa64
Fix build error
sfc-gh-ext-simba-nl Feb 21, 2025
e711c27
Fix filename
sfc-gh-ext-simba-nl Feb 21, 2025
2c27a38
Fix windows VS17 build error
sfc-gh-ext-simba-nl Mar 3, 2025
463e016
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-jszczerbinski Mar 4, 2025
284de79
Revert fix in SnowflakeCommon, remove unused stats from SF_QUERY_META…
sfc-gh-ext-simba-nl Mar 4, 2025
6e448c8
Merge branch 'master' into SNOW-692968-async-queries-support
sfc-gh-jszczerbinski Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge mastre
sfc-gh-ext-simba-nl committed Jan 20, 2025
commit 9003f0357983a37d54e275d074eb6ca227dcd8cf
4 changes: 4 additions & 0 deletions include/snowflake/client.h
Original file line number Diff line number Diff line change
@@ -541,6 +541,10 @@ typedef struct SF_STMT {
SF_STATS *stats;
void *stmt_attrs;
sf_bool is_dml;
sf_bool is_multi_stmt;
void* multi_stmt_result_ids;
int64 multi_stmt_count;
int64 paramset_size;
sf_bool is_async;
sf_bool is_async_initialized;

173 changes: 28 additions & 145 deletions lib/client.c
Original file line number Diff line number Diff line change
@@ -2790,22 +2790,22 @@ snowflake_prepare(SF_STMT *sfstmt, const char *command, size_t command_size) {

SF_STATUS STDCALL snowflake_describe_with_capture(SF_STMT *sfstmt,
SF_QUERY_RESULT_CAPTURE *result_capture) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE, SF_BOOLEAN_FALSE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), SF_BOOLEAN_TRUE, NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) {
if (!sfstmt->is_async) {
sfstmt->is_async = SF_BOOLEAN_TRUE;
}
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), SF_BOOLEAN_TRUE, NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE);
}

SF_STATUS STDCALL snowflake_execute_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) {
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), SF_BOOLEAN_TRUE, result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
@@ -2873,11 +2873,17 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
// Create Body
body = create_query_json_body(sfstmt->sql_text, sfstmt->sequence_counter,
is_string_empty(sfstmt->connection->directURL) ?
NULL : sfstmt->request_id, is_describe_only);

snowflake_cJSON_AddBoolToObject(body, "asyncExec", is_async_exec);

if (bindings != NULL) {
NULL : sfstmt->request_id, is_describe_only,
sfstmt->multi_stmt_count);

snowflake_cJSON_AddBoolToObject(body, "asyncExec", is_async_exec);

if (bind_stage)
{
snowflake_cJSON_AddStringToObject(body, "bindStage", bind_stage);
SF_FREE(bind_stage);
}
else if (bindings != NULL) {
/* binding parameters if exists */
snowflake_cJSON_AddItemToObject(body, "bindings", bindings);
}
@@ -3019,149 +3025,26 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
} else {
int64 stmt_type_id;
if (json_copy_int(&stmt_type_id, data, "statementTypeId")) {
/* failed to get statement type id */
sfstmt->is_dml = SF_BOOLEAN_FALSE;
} else {
sfstmt->is_dml = detect_stmt_type(stmt_type_id);
/* failed to get statement type id */
sfstmt->is_multi_stmt = SF_BOOLEAN_FALSE;
}
rowtype = snowflake_cJSON_GetObjectItem(data, "rowtype");
if (snowflake_cJSON_IsArray(rowtype)) {
sfstmt->total_fieldcount = snowflake_cJSON_GetArraySize(
rowtype);
_snowflake_stmt_desc_reset(sfstmt);
sfstmt->desc = set_description(rowtype);
else {
sfstmt->is_multi_stmt = (_SF_STMT_TYPE_MULTI_STMT == stmt_type_id);
}
stats = snowflake_cJSON_GetObjectItem(data, "stats");
if (snowflake_cJSON_IsObject(stats)) {
_snowflake_stmt_row_metadata_reset(sfstmt);
sfstmt->stats = set_stats(stats);
} else {
sfstmt->stats = NULL;
}

// Determine query result format and detach rowset object from data.
cJSON * qrf = snowflake_cJSON_GetObjectItem(data, "queryResultFormat");
if (qrf) {
char* qrf_str = snowflake_cJSON_GetStringValue(qrf);
sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t));
cJSON* rowset = NULL;

if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) {
#ifdef SF_WIN32
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT,
"Query results were fetched using Arrow, "
"but the client library does not yet support decoding Arrow results", "",
sfstmt->sfqid);

return SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT;
#endif
* ((QueryResultFormat_t*)sfstmt->qrf) = ARROW_FORMAT;
rowset = snowflake_cJSON_DetachItemFromObject(data, "rowsetBase64");
if (!rowset)
if (sfstmt->is_multi_stmt)
{
if (!setup_multi_stmt_result(sfstmt, data))
{
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
goto cleanup;
}
}
else if (strcmp(qrf_str, "json") == 0) {
*((QueryResultFormat_t*)sfstmt->qrf) = JSON_FORMAT;
if (json_detach_array_from_object((cJSON**)(&rowset), data, "rowset"))
}
else
{
if (!setup_result_with_json_resp(sfstmt, data))
{
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
}
}
else {
log_error("Unsupported query result format: %s", qrf_str);
}

// Index starts at 0 and incremented each fetch
sfstmt->total_row_index = 0;

// When the result set is sufficient large, the server response will contain
// an empty "rowset" object. Instead, it will have a "chunks" object that contains,
// among other fields, a URL from which the result set can be downloaded in chunks.
// In this case, we initialize the chunk downloader, which will download in the
// background as calls to snowflake_fetch() are made.
if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) {
// We don't care if there is no qrmk, so ignore return code
json_copy_string(&qrmk, data, "qrmk");
chunk_headers = snowflake_cJSON_GetObjectItem(data, "chunkHeaders");
NON_JSON_RESP* (*callback_create_resp)(void) = NULL;
if (ARROW_FORMAT == *((QueryResultFormat_t*)sfstmt->qrf)) {
callback_create_resp = callback_create_arrow_resp;
}

sfstmt->chunk_downloader = chunk_downloader_init(
qrmk,
chunk_headers,
chunks,
2, // thread count
4, // fetch slot
&sfstmt->error,
sfstmt->connection->insecure_mode,
sfstmt->connection->ocsp_fail_open,
callback_create_resp,
sfstmt->connection->proxy,
sfstmt->connection->no_proxy,
get_retry_timeout(sfstmt->connection),
sfstmt->connection->retry_count);
if (!sfstmt->chunk_downloader) {
// Unable to create chunk downloader.
// Error is set in chunk_downloader_init function.
goto cleanup;
}

// Even when the result set is split into chunks, JSON format will still
// response with the first chunk in "rowset", so be sure to include it.
sfstmt->result_set = rs_create_with_json_result(
rowset,
sfstmt->desc,
(QueryResultFormat_t*)sfstmt->qrf,
sfstmt->connection->timezone);

// Update chunk row count. Controls the chunk downloader.
sfstmt->chunk_rowcount = rs_get_row_count_in_chunk(
sfstmt->result_set,
(QueryResultFormat_t*)sfstmt->qrf);

// Update total row count. Used in snowflake_num_rows().
if (json_copy_int(&sfstmt->total_rowcount, data, "total")) {
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
}
}
else {
// Create a result set object and update the total rowcount.
sfstmt->result_set = rs_create_with_json_result(
rowset,
sfstmt->desc,
(QueryResultFormat_t*)sfstmt->qrf,
sfstmt->connection->timezone);

// Update chunk row count. Controls the chunk downloader.
sfstmt->chunk_rowcount = rs_get_row_count_in_chunk(
sfstmt->result_set,
(QueryResultFormat_t*)sfstmt->qrf);

// Update total row count. Used in snowflake_num_rows().
if (json_copy_int(&sfstmt->total_rowcount, data, "total")) {
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
goto cleanup;
}
}
}
}
} else if (json_error != SF_JSON_ERROR_NONE) {
1 change: 1 addition & 0 deletions lib/client_int.h
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
#define QUERY_URL "/queries/v1/query-request"
#define RENEW_SESSION_URL "/session/token-request"
#define DELETE_SESSION_URL "/session"
#define QUERY_RESULT_URL_FORMAT "/queries/%s/result"
#define QUERY_MONITOR_URL "/monitoring/queries/%s"
// not used for now but add for URL checking on connection requests
#define AUTHENTICATOR_URL "/session/authenticator-request"
You are viewing a condensed version of this merge commit. You can view the full changes here.