Skip to content

Commit

Permalink
implement handling of Elastic's SQL cursor
Browse files Browse the repository at this point in the history
- the driver can take a MaxFetchSize parameter (in connection string)
  that will limit the number of rows received from server.
- implemented handling of the cursor string received in the result sets,
  irrespective if a fetch size was previously set or if the server
  decides to chunk the response.
- added logging macros to always add the statement address in messages
  (simplifies tracking).
  • Loading branch information
bpintea committed Feb 19, 2018
1 parent fab7b0c commit ccb0686
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 196 deletions.
116 changes: 76 additions & 40 deletions driver/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
#define HTTP_ACCEPT_JSON "Accept: application/json"
#define HTTP_CONTENT_TYPE_JSON "Content-Type: application/json; charset=utf-8"

#define JSON_SQL_TEMPLATE_START "{\"query\":\""
#define JSON_SQL_TEMPLATE_MID_QUERY "\""
#define JSON_SQL_TEMPLATE_MID_FETCH \
JSON_SQL_TEMPLATE_MID_QUERY ",\"fetch_size\":"
#define JSON_SQL_TEMPLATE_END "}"
#define JSON_SQL_QUERY_START "{\"query\":\""
#define JSON_SQL_QUERY_MID "\""
#define JSON_SQL_QUERY_MID_FETCH JSON_SQL_QUERY_MID ",\"fetch_size\":"
#define JSON_SQL_QUERY_END "}"
#define JSON_SQL_CURSOR_START "{\"cursor\":\""
#define JSON_SQL_CURSOR_END "\"}"

/*
* HTTP headers used for all requests (Content-Type, Accept).
Expand Down Expand Up @@ -155,7 +156,7 @@ static size_t write_callback(char *ptr, size_t size, size_t nmemb,
memcpy(dbc->abuff + dbc->apos, ptr, have);
dbc->apos += have;
DBG("libcurl: DBC@0x%p, copied: %zd bytes.", dbc, have);
//DBG("libcurl: DBC@0x%p, copied: `%.*s`.", dbc, have, ptr);
DBG("libcurl: DBC@0x%p, copied: `%.*s`.", dbc, have, ptr);


/*
Expand Down Expand Up @@ -328,7 +329,7 @@ static SQLRETURN post_request(esodbc_stmt_st *stmt,
size_t apos;
long to, code;

DBG("STMT@0x%p posting request `%.*s` (%zd).", stmt, blen, u8body, blen);
DBGSTMT(stmt, "posting request `%.*s` (%zd).", blen, u8body, blen);

if (! dbc->curl)
init_curl(dbc);
Expand Down Expand Up @@ -405,8 +406,8 @@ static SQLRETURN post_request(esodbc_stmt_st *stmt,
return attach_answer(stmt, abuff, apos);

err:
ERR("libcurl: request failed STMT@0x%p (timeout:%ld, body:`%.*s`) "
"failed: '%s' (%d).", stmt, timeout, blen, u8body,
ERRSTMT(stmt, "libcurl: request failed (timeout:%ld, body:`%.*s`) "
"failed: '%s' (%d).", timeout, blen, u8body,
res != CURLE_OK ? curl_easy_strerror(res) : "<unspecified>", res);
err_net: /* the error occured after the request hit hit the network */
if (abuff)
Expand Down Expand Up @@ -510,27 +511,54 @@ static size_t json_escape(const char *jin, size_t inlen, char *jout,
#undef I16TOA
}


/*
* Build a JSON query or cursor object (if statment having one) and send it as
* the body of a REST POST requrest.
*/
SQLRETURN post_statement(esodbc_stmt_st *stmt)
{
SQLRETURN ret;
esodbc_dbc_st *dbc = stmt->dbc;
size_t bodylen, pos;
size_t bodylen, pos, u8len;
char *body, buff[ESODBC_BODY_BUF_START_SIZE];
char u8curs[ESODBC_BODY_BUF_START_SIZE];

// TODO: add params (if any)

bodylen = sizeof(JSON_SQL_TEMPLATE_START) - /*\0*/1;
if (dbc->fetch.slen) {
bodylen += sizeof(JSON_SQL_TEMPLATE_MID_FETCH) - /*\0*/1;
bodylen += dbc->fetch.slen;
} else {
bodylen += sizeof(JSON_SQL_TEMPLATE_MID_QUERY) - /*\0*/1;
/* TODO: move escaping/x-coding (to JSON or CBOR) in attach_sql() and/or
* attach_answer() to avoid these operations for each execution of the
* statement (especially for the SQL statement; the cursor might not
* always be used - if app decides to no longer fetch - but would then
* clean this function). */

/* evaluate how long the stringified REST object will be */
if (stmt->rset.eccnt) { /* eval CURSOR object lenght */
/* convert cursor to C [mb]string. */
/* TODO: ansi_w2c() fits better for Base64 encoded cursors. */
u8len = WCS2U8(stmt->rset.ecurs, (int)stmt->rset.eccnt, u8curs,
sizeof(u8curs));
if (u8len <= 0) {
ERRSTMT(stmt, "failed to convert cursor `" LTPDL "` to UTF8: %d.",
stmt->rset.eccnt, stmt->rset.ecurs, WCS2U8_ERRNO());
RET_HDIAGS(stmt, SQL_STATE_24000);
}

bodylen = sizeof(JSON_SQL_CURSOR_START) - /*\0*/1;
bodylen += json_escape(u8curs, u8len, NULL, 0);
bodylen += sizeof(JSON_SQL_CURSOR_END) - /*\0*/1;
} else { /* eval QUERY object lenght */
bodylen = sizeof(JSON_SQL_QUERY_START) - /*\0*/1;
if (dbc->fetch.slen) {
bodylen += sizeof(JSON_SQL_QUERY_MID_FETCH) - /*\0*/1;
bodylen += dbc->fetch.slen;
} else {
bodylen += sizeof(JSON_SQL_QUERY_MID) - /*\0*/1;
}
bodylen += json_escape(stmt->u8sql, stmt->sqllen, NULL, 0);
bodylen += sizeof(JSON_SQL_QUERY_END) - /*\0*/1;
}
//bodylen += stmt->sqllen;
bodylen += json_escape(stmt->u8sql, stmt->sqllen, NULL, 0);
bodylen += sizeof(JSON_SQL_TEMPLATE_END) - /*\0*/1;

/* allocate memory for the stringified buffer, if needed */
if (sizeof(buff) < bodylen) {
WARN("local buffer too small (%zd), need %zdB; will alloc.",
sizeof(buff), bodylen);
Expand All @@ -544,29 +572,36 @@ SQLRETURN post_statement(esodbc_stmt_st *stmt)
body = buff;
}

pos = sizeof(JSON_SQL_TEMPLATE_START) - 1;
memcpy(body, JSON_SQL_TEMPLATE_START, pos);
//memcpy(body + pos, stmt->u8sql, stmt->sqllen);
//pos += stmt->sqllen;
/* TODO: if not adding CBOR, move escaping in attach_sql */
pos += json_escape(stmt->u8sql, stmt->sqllen, body + pos, bodylen - pos);
/* build the actual stringified JSON object */
if (stmt->rset.eccnt) { /* copy CURSOR object */
pos = sizeof(JSON_SQL_CURSOR_START) - /*\0*/1;
memcpy(body, JSON_SQL_CURSOR_START, pos);
pos += json_escape(u8curs, u8len, body + pos, bodylen - pos);
memcpy(body + pos, JSON_SQL_CURSOR_END,
sizeof(JSON_SQL_CURSOR_END) - /*WITH \0*/0);
pos += sizeof(JSON_SQL_CURSOR_END) - /* but don't account for it */1;
} else { /* copy QUERY object */
pos = sizeof(JSON_SQL_QUERY_START) - 1;
memcpy(body, JSON_SQL_QUERY_START, pos);
pos += json_escape(stmt->u8sql, stmt->sqllen, body + pos, bodylen-pos);

if (dbc->fetch.slen) {
memcpy(body + pos, JSON_SQL_QUERY_MID_FETCH,
sizeof(JSON_SQL_QUERY_MID_FETCH) - 1);
pos += sizeof(JSON_SQL_QUERY_MID_FETCH) - 1;
memcpy(body + pos, dbc->fetch.str, dbc->fetch.slen);
pos += dbc->fetch.slen;
} else {
memcpy(body + pos, JSON_SQL_QUERY_MID,
sizeof(JSON_SQL_QUERY_MID) - 1);
pos += sizeof(JSON_SQL_QUERY_MID) - 1;
}

if (dbc->fetch.slen) {
memcpy(body + pos, JSON_SQL_TEMPLATE_MID_FETCH,
sizeof(JSON_SQL_TEMPLATE_MID_FETCH) - 1);
pos += sizeof(JSON_SQL_TEMPLATE_MID_FETCH) - 1;
memcpy(body + pos, dbc->fetch.str, dbc->fetch.slen);
pos += dbc->fetch.slen;
} else {
memcpy(body + pos, JSON_SQL_TEMPLATE_MID_QUERY,
sizeof(JSON_SQL_TEMPLATE_MID_QUERY) - 1);
pos += sizeof(JSON_SQL_TEMPLATE_MID_QUERY) - 1;
memcpy(body + pos, JSON_SQL_QUERY_END,
sizeof(JSON_SQL_QUERY_END) - /*WITH \0*/0);
pos += sizeof(JSON_SQL_QUERY_END) - /* but don't account for it */1;
}

memcpy(body + pos, JSON_SQL_TEMPLATE_END,
sizeof(JSON_SQL_TEMPLATE_END) - /*WITH \0*/0);
pos += sizeof(JSON_SQL_TEMPLATE_END) - /* but don't account for it */1;

ret = post_request(stmt, ESODBC_DEFAULT_TIMEOUT, body, pos);

if (body != buff)
Expand All @@ -575,6 +610,7 @@ SQLRETURN post_statement(esodbc_stmt_st *stmt)
return ret;
}


static SQLRETURN test_connect(CURL *curl)
{
CURLcode res;
Expand Down
2 changes: 1 addition & 1 deletion driver/error.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SQLRETURN post_diagnostic(esodbc_diag_st *dest, esodbc_state_et state,
wcsncpy(dest->text + pos, text, tlen + /* 0-term */1);
dest->text_len = (int)(pos + tlen);
}
DBG("diagnostic message: `" LTPD "` (%d), native code: %d.", dest->text,
DBG("diagnostic message: `" LTPD "` [%d], native code: %d.", dest->text,
dest->text_len, dest->native_code);

RET_STATE(state);
Expand Down
2 changes: 2 additions & 0 deletions driver/handles.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ typedef struct struct_resultset {

void *state; /* top UJSON decoder state */
void *rows_iter; /* UJSON array with the result set */
const wchar_t *ecurs; /* Elastic's cursor object */
size_t eccnt; /* cursor char count */

size_t nrows; /* (count of) number of rows in current result set */
size_t vrows; /* (count of) visited rows in current result set */
Expand Down
38 changes: 4 additions & 34 deletions driver/info.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ SQLRETURN EsSQLGetDiagRecW
_Out_opt_ SQLSMALLINT *TextLength
)
{
esodbc_diag_st *diag;
esodbc_diag_st *diag, dummy;

if (RecNumber <= 0) {
ERR("record number must be >=1; received: %d.", RecNumber);
Expand Down Expand Up @@ -616,42 +616,12 @@ SQLRETURN EsSQLGetDiagRecW
/* API assumes there's always enough buffer here.. */
/* no err indicator */
if (Sqlstate)
wcsncpy(Sqlstate, esodbc_errors[diag->state].code, SQL_SQLSTATE_SIZE);
wcscpy(Sqlstate, esodbc_errors[diag->state].code);
if (NativeError)
*NativeError = diag->native_code;

/* always return how many we would need, or have used */
*TextLength = diag->text_len; /* count needed in characters */

if (MessageText && diag->text_len) {
if (diag->text_len < BufferLength) {
if ((BufferLength % 2) && (1 < sizeof(SQLTCHAR))) {
/* not specified in API for this function, but pretty much for
* any other wide-char using ones */
ERR("BufferLength not an even number: %d.", BufferLength);
return SQL_ERROR;
}
/* no error indication exists */
wcsncpy(MessageText, diag->text, diag->text_len);
DBG("diagnostic text: `"LTPD"` (%d).", MessageText,diag->text_len);
return SQL_SUCCESS;
} else {
if (BufferLength < 0) {
ERR("buffer length must be non-negative; received: %d.",
BufferLength);
return SQL_ERROR;
}
/* no error indication exists */
wcsncpy(MessageText, diag->text, BufferLength);
INFO("not enough space to copy diagnostic message; "
"have: %d, need: %d.", BufferLength, *TextLength);
return SQL_SUCCESS_WITH_INFO;
}
}

DBG("call only asking for available diagnostic characters to return (%d).",
*TextLength);
return SQL_SUCCESS;
return write_tstr(&dummy, MessageText, diag->text, BufferLength,
TextLength);
}


Expand Down
4 changes: 4 additions & 0 deletions driver/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ extern int _esodbc_log_level;
#define INFO(fmt, ...) LOG(LOG_LEVEL_INFO, fmt, __VA_ARGS__)
#define DBG(fmt, ...) LOG(LOG_LEVEL_DBG, fmt, __VA_ARGS__)

#define ERRSTMT(stmt, fmt, ...) ERR("STMT@0x%p: " fmt, stmt, __VA_ARGS__)
#define WARNSTMT(stmt, fmt, ...) WARN("STMT@0x%p: " fmt, stmt, __VA_ARGS__)
#define DBGSTMT(stmt, fmt, ...) DBG("STMT@0x%p: " fmt, stmt, __VA_ARGS__)

#define BUG(fmt, ...) \
do { \
LOG(LOG_LEVEL_ERR, fmt, __VA_ARGS__); \
Expand Down
Loading

0 comments on commit ccb0686

Please sign in to comment.