Skip to content

Commit

Permalink
add catalog functions and execution support
Browse files Browse the repository at this point in the history
- integrate Elastic/SQL REST endpoint for catalog functions;
- implement SQLGetTypeInfo, SQLTables, SQLColumns catalog functions;
- implement SQLPrepare, SQLExeute

- TODO: input SQL and parameter (of catalog fn's) escaping
  • Loading branch information
bpintea committed Feb 4, 2018
1 parent 494e41a commit 0e287e8
Show file tree
Hide file tree
Showing 10 changed files with 783 additions and 500 deletions.
285 changes: 114 additions & 171 deletions driver/catalogue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

#include <stdio.h>
#include <string.h>
#include <wchar.h>

Expand All @@ -16,37 +17,14 @@
#include "info.h"
#include "queries.h"

#define SQL_TABLES_TEXT "SYS TABLES;"
#define SQL_TABLES_JSON "{\"table_pattern\" : \"" WPFWP_DESC "\"}"

#define JSON_TABLES_PREFIX \
"{\"" JSON_ANSWER_COLUMNS "\": [" \
"{" \
"\"name\": \"TABLE_CAT\"," \
"\"type\": \"text\"" \
"}," \
"{" \
"\"name\": \"TABLE_SCHEM\"," \
"\"type\": \"text\"" \
"}," \
"{" \
"\"name\": \"TABLE_NAME\"," \
"\"type\": \"text\"" \
"}," \
"{" \
"\"name\": \"TABLE_TYPE\"," \
"\"type\": \"text\"" \
"}," \
"{" \
"\"name\": \"REMARKS\"," \
"\"type\": \"text\"" \
"}" \
"]," \
"\"" JSON_ANSWER_ROWS "\": ["
#define JSON_TABLES_SUFFIX "]}"

#define JSON_TABLES_ROW_TEMPLATE \
"[null,null,\"" LTPDL "\",\"TABLE\",\"\"]"
#if 1
#define SQL_TABLES_START "SYS TABLES LIKE '"
#define SQL_TABLES_END "'"
//#define SQL_TABLES_END "';"
#endif

#define ESODBC_SQL_TABLES "SYS TABLES LIKE"
#define ESODBC_SQL_COLUMNS "SYS COLUMNS TABLES LIKE"

SQLRETURN EsSQLTablesW(
SQLHSTMT StatementHandle,
Expand All @@ -59,37 +37,29 @@ SQLRETURN EsSQLTablesW(
_In_reads_opt_(NameLength4) SQLTCHAR *TableType,
SQLSMALLINT NameLength4)
{
esodbc_stmt_st *stmt = STMH(StatementHandle);
SQLRETURN ret;
SQLTCHAR wbuf[ESODBC_MAX_IDENTIFIER_LEN + sizeof(SQL_TABLES_START) +
sizeof(SQL_TABLES_END)]; /*includes 2x\0*/
SQLTCHAR *table_type, *table_name;
SQLTCHAR tbuf[SQL_MAX_IDENTIFIER_LEN + sizeof(SQL_TABLES_JSON)];
char u8[/*max bytes per character */4 * SQL_MAX_IDENTIFIER_LEN +
sizeof(SQLTCHAR) * sizeof(SQL_TABLES_JSON)];
char *answer, faked[1<<16]; // FIXME: buffer handling
char *dupped;
esodbc_stmt_st *stmt = STMH(StatementHandle);
esodbc_dbc_st *dbc = stmt->dbc;
int i;
long count;
size_t len, pos;
UJObject obj, tables, table;
wchar_t *keys[] = {L"tables"};
const wchar_t *tname;
void *state = NULL, *iter;
size_t count, pos;

if (CatalogName && (wmemcmp(CatalogName, MK_TSTR(SQL_ALL_CATALOGS),
NameLength1) != 0)) {
WARN("filtering by catalog is not supported."); /* TODO? */
ERR("filtering by catalog is not supported."); /* TODO? */
goto empty;
}
if (SchemaName && (wmemcmp(SchemaName, MK_TSTR(SQL_ALL_SCHEMAS),
NameLength2) != 0)) {
WARN("filtering by schemas is not supported.");
ERR("filtering by schemas is not supported.");
goto empty;
}
if (TableType) {
if (TableType) { /* TODO: server support needed */
table_type = _wcsupr(TableType);
if (! wcsstr(table_type, MK_TSTR("TABLE"))) {
INFO("no 'TABLE' type fond in filter `"LTPD"`: empty result set");
if ((! wcsstr(table_type, MK_TSTR("TABLE"))) &&
(! wcsstr(table_type, MK_TSTR("ALIAS")))) {
WARN("no 'TABLE' or 'ALIAS' type fond in filter `"LTPD"`: "
"empty result set");
goto empty;
}
}
Expand All @@ -98,141 +68,114 @@ SQLRETURN EsSQLTablesW(
if (stmt->metadata_id == SQL_TRUE) {
// FIXME: string needs escaping of % \\ _
FIXME;
} else {
/* ES uses * as wildcard instead of % (TODO: only?) */
for (i = 0; i < NameLength3; i ++)
if (TableName[i] == (SQLTCHAR)'%')
TableName[i] = (SQLTCHAR)'*';
DBG("new table name after s/%%/*: `"LTPD"`.", TableName);
}
table_name = TableName;
count = NameLength3;
} else {
table_name = MK_TSTR("*");
table_name = MK_TSTR("%");
count = sizeof("%") - 1;
}

/* print JSON to send to server */
count = _snwprintf(tbuf, sizeof(tbuf), SQL_TABLES_JSON, table_name);
if ((count < 0) || sizeof(tbuf) <= count) {
ERRN("failed to print JSON or buffer too small (%d).", count);
RET_HDIAGS(stmt, SQL_STATE_HY000);
if (ESODBC_MAX_IDENTIFIER_LEN < NameLength3) {
ERR("TableName `" LTPDL "` too long (limit: %zd).", NameLength3,
TableName, ESODBC_MAX_IDENTIFIER_LEN);
RET_HDIAG(stmt, SQL_STATE_HY000, "TableName too long", 0);
}
/* print SQL to send to server */
/* count/pos always indicate number of characters (not bytes) */
pos = sizeof(SQL_TABLES_START) - 1;
memcpy(wbuf, MK_TSTR(SQL_TABLES_START), pos * sizeof(SQLTCHAR));
memcpy(&wbuf[pos], table_name, count * sizeof(SQLTCHAR));
pos += count;
count = sizeof(SQL_TABLES_END) - 1;
memcpy(&wbuf[pos], MK_TSTR(SQL_TABLES_END), count * sizeof(SQLTCHAR));
pos += count;

ret = EsSQLFreeStmt(stmt, ESODBC_SQL_CLOSE);
assert(SQL_SUCCEEDED(ret)); /* can't return error */
ret = attach_sql(stmt, wbuf, pos);
if (SQL_SUCCEEDED(ret))
ret = post_statement(stmt);
return ret;

#ifdef _WIN32 /* TODO a Posix friendlier equivalent */
/* convert UCS2 -> UTF8
* WC_NO_BEST_FIT_CHARS: https://docs.microsoft.com/en-us/sql/odbc/reference/develop-app/unicode-data
*/
count = WideCharToMultiByte(CP_UTF8, WC_NO_BEST_FIT_CHARS, tbuf, count,
u8, sizeof(u8), NULL, NULL);
if (count <= 0) {
ERRN("conversion UCS2/UTF8 failed (%d).", count);
RET_HDIAGS(stmt, SQL_STATE_HY000);
} else {
DBG("UCS2/UTF8 converted on %d bytes.", count);
}
#else /* _WIN32 */
# error "platform not supported"
#endif /* _WIN32 */

/* attach the SQL textual statement to the handler */
ret = attach_sqltext(stmt, MK_TSTR(SQL_TABLES_TEXT),
sizeof(SQL_TABLES_TEXT)/*add 0-term*/);
if (! SQL_SUCCEEDED(ret))
return ret;

#if 0
count = post_sql_tables(dbc, ESODBC_TIMEOUT_DEFAULT, u8, count, answer,
sizeof(answer));
DBG("response received `%.*s` (%ld).", count, answer, count);
#else //0
ret = post_sql_tables(stmt, ESODBC_TIMEOUT_DEFAULT, u8, count);
if (! SQL_SUCCEEDED(ret)) {
ERR("post failed.");
goto err;
}
answer = dbc->wbuf;
count = (long)dbc->wpos;
dbc->wbuf = NULL;
dbc->wlen = 0;
dbc->wpos = 0;
DBG("response received `%.*s` (%ld).", count, answer, count);
#endif //0

/*
* Note: UJDecode will treat UTF8 strings and UJReadString will return
* directly wchar_t.
*/
obj = UJDecode(answer, count, NULL, &state);
free(answer);
answer = NULL;
if (UJObjectUnpack(obj, 1, "A", keys, &tables) < 1) {
ERR("failed to decode JSON answer: %s.",
state ? UJGetError(state) : "<none>");
goto err;
}
DBG("unpacked array of size: %zd.", UJArraySize(tables));
empty:
RET_HDIAG(stmt, SQL_STATE_HYC00, "Table filtering not supported", 0);
}

assert(sizeof(JSON_TABLES_PREFIX) < sizeof(faked));
memcpy(faked, JSON_TABLES_PREFIX, sizeof(JSON_TABLES_PREFIX) - /*0term*/1);
pos = sizeof(JSON_TABLES_PREFIX) - 1;

iter = UJBeginArray(tables);
if (! iter) {
ERR("failed to obtain array iterator: %s.", UJGetError(state));
goto err;
SQLRETURN EsSQLColumnsW
(
SQLHSTMT hstmt,
_In_reads_opt_(cchCatalogName) SQLWCHAR* szCatalogName,
SQLSMALLINT cchCatalogName,
_In_reads_opt_(cchSchemaName) SQLWCHAR* szSchemaName,
SQLSMALLINT cchSchemaName,
_In_reads_opt_(cchTableName) SQLWCHAR* szTableName,
SQLSMALLINT cchTableName,
_In_reads_opt_(cchColumnName) SQLWCHAR* szColumnName,
SQLSMALLINT cchColumnName
)
{
esodbc_stmt_st *stmt = STMH(hstmt);
SQLTCHAR *tablename, *columnname;
SQLSMALLINT tn_clen, cn_clen;
/* 8 for \' and spaces and extra */
SQLTCHAR wbuf[sizeof(ESODBC_SQL_COLUMNS) + 2 * ESODBC_MAX_IDENTIFIER_LEN + 8];
int clen;
SQLRETURN ret;

if (szCatalogName && (wmemcmp(szCatalogName, MK_TSTR(SQL_ALL_CATALOGS),
cchCatalogName) != 0)) {
ERR("filtering by catalog is not supported."); /* TODO? */
goto empty;
}
count = 0;
while (UJIterArray(&iter, &table)) {
if (! UJIsString(table)) {
ERR("received non-string table name element (%d) - skipped.",
UJGetType(table));
continue;
}
tname = UJReadString(table, &len);
DBG("available table: `" LTPDL "`.", len, tname);
count = snprintf(faked + pos, sizeof(faked),
"%s" JSON_TABLES_ROW_TEMPLATE, count ? "," : "",
(int)len, tname);
if (count < 0) {
ERRN("buffer printing failed");
goto err;
} else if (count < sizeof(JSON_TABLES_ROW_TEMPLATE) - 1
- sizeof(LTPDL) - 1 + len) {
ERR("buffer to small (%d) to print faked 'tables' JSON.",
sizeof(faked));
goto err;
if (szSchemaName && (wmemcmp(szSchemaName, MK_TSTR(SQL_ALL_SCHEMAS),
cchSchemaName) != 0)) {
ERR("filtering by schemas is not supported.");
goto empty;
}
if (szTableName && cchTableName) {
if (ESODBC_MAX_IDENTIFIER_LEN < cchTableName) {
ERR("TableName `" LTPDL "` too long (limit: %zd).", cchTableName,
szTableName, ESODBC_MAX_IDENTIFIER_LEN);
RET_HDIAG(stmt, SQL_STATE_HY000, "TableName too long", 0);
}
pos += count;
tablename = szTableName;
tn_clen = cchTableName;
} else {
tablename = MK_TSTR("%");
tn_clen = sizeof("%") - 1;
}
UJFree(state);
state = NULL;

if (sizeof(faked) <= pos + sizeof(JSON_TABLES_SUFFIX)/*w/ \0*/) {
ERR("buffer to small (%d) to print faked 'tables' JSON.",
sizeof(faked));
goto err;
if (szColumnName && cchColumnName) {
if (ESODBC_MAX_IDENTIFIER_LEN < cchColumnName) {
ERR("ColumnName `" LTPDL "` too long (limit: %zd).", cchColumnName,
szColumnName, ESODBC_MAX_IDENTIFIER_LEN);
RET_HDIAG(stmt, SQL_STATE_HY000, "ColumnName too long", 0);
}
columnname = szColumnName;
cn_clen = cchColumnName;
} else {
columnname = MK_TSTR("%");
cn_clen = sizeof("%") - 1;
}
memcpy(faked + pos, JSON_TABLES_SUFFIX, sizeof(JSON_TABLES_SUFFIX));
pos += sizeof(JSON_TABLES_SUFFIX); /* includes \0 */

DBG("faked 'tables' answer: `%.*s`.", pos, faked);

dupped = (char *)malloc(pos);
if (! dupped) {
ERRN("failed to strndup faked answer.");
RET_HDIAGS(stmt, SQL_STATE_HY001);
clen = _snwprintf(wbuf, sizeof(wbuf)/sizeof(SQLTCHAR),
MK_TSTR("%s '%.*s' '%.*s'"), MK_TSTR(ESODBC_SQL_COLUMNS), tn_clen,
tablename, cn_clen, columnname);
if (clen <= 0 || sizeof(wbuf)/sizeof(SQLTCHAR) <= clen) { /* == */
ERRN("SQL printing failed (buffer too small (%zdB)?).", sizeof(wbuf));
RET_HDIAGS(stmt, SQL_STATE_HY000);
}
memcpy(dupped, faked, pos);

return attach_answer(stmt, dupped, pos - /*\0*/1);

ret = EsSQLFreeStmt(stmt, ESODBC_SQL_CLOSE);
assert(SQL_SUCCEEDED(ret)); /* can't return error */
ret = attach_sql(stmt, wbuf, clen);
if (SQL_SUCCEEDED(ret))
ret = post_statement(stmt);
return ret;

empty:
// FIXME: set empty result to statement
FIXME;
return SQL_SUCCESS;

err:
// FIXME
if (state)
UJFree(state);
RET_HDIAGS(stmt, SQL_STATE_HY000);
RET_HDIAG(stmt, SQL_STATE_HYC00, "Table filtering not supported", 0);
}


/* vim: set noet fenc=utf-8 ff=dos sts=0 sw=4 ts=4 : */
12 changes: 12 additions & 0 deletions driver/catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ SQLRETURN EsSQLTablesW(
_In_reads_opt_(NameLength4) SQLWCHAR *TableType,
SQLSMALLINT NameLength4);

SQLRETURN EsSQLColumnsW
(
SQLHSTMT hstmt,
_In_reads_opt_(cchCatalogName) SQLWCHAR* szCatalogName,
SQLSMALLINT cchCatalogName,
_In_reads_opt_(cchSchemaName) SQLWCHAR* szSchemaName,
SQLSMALLINT cchSchemaName,
_In_reads_opt_(cchTableName) SQLWCHAR* szTableName,
SQLSMALLINT cchTableName,
_In_reads_opt_(cchColumnName) SQLWCHAR* szColumnName,
SQLSMALLINT cchColumnName
);
#endif /* __CATALOGUE_H__ */


Expand Down
Loading

0 comments on commit 0e287e8

Please sign in to comment.