Skip to content

Commit

Permalink
out_es: tests: Add HTTP response testing
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Oct 10, 2024
1 parent 304524c commit 04b7e15
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 0 deletions.
73 changes: 73 additions & 0 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

static int elasticsearch_response_test(struct flb_config *config,
void *plugin_context,
int status,
const void *data, size_t bytes,
void **out_data, size_t *out_size)
{
int ret = 0;
struct flb_elasticsearch *ctx = plugin_context;
struct flb_connection *u_conn;
struct flb_http_client *c;
size_t b_sent;

/* Not retrieve upstream connection */
u_conn = NULL;

/* Compose HTTP Client request (dummy client) */
c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri,
NULL, 0, NULL, 0, NULL, 0);

flb_http_buffer_size(c, ctx->buffer_size);

/* Just stubbing the HTTP responses */
flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL);

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
goto error;
}
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
goto error;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
if (c->resp.status != 200 && c->resp.status != 201) {
if (c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
c->resp.status, ctx->uri, c->resp.payload);
}
else {
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
c->resp.status, ctx->uri);
}
goto error;
}

if (c->resp.payload_size > 0) {
/*
* Elasticsearch payload should be JSON, we convert it to msgpack
* and lookup the 'error' field.
*/
ret = elasticsearch_error_check(ctx, c);
}
else {
goto error;
}
}

/* Cleanup */
flb_http_client_destroy(c);

return ret;

error:
/* Cleanup */
flb_http_client_destroy(c);

return -2;
}

static int cb_es_exit(void *data, struct flb_config *config)
{
struct flb_elasticsearch *ctx = data;
Expand Down Expand Up @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = {

/* Test */
.test_formatter.callback = elasticsearch_format,
.test_response.callback = elasticsearch_response_test,

/* Plugin flags */
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
Expand Down
34 changes: 34 additions & 0 deletions tests/runtime/data/es/json_es.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,37 @@
#define JSON_DOTS \
"[1448403340," \
"{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]"

#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}"

#define JSON_RESPONSE_SUCCESSES_SIZE 783

#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \
"[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \
"\"error\":{\"type\":\"document_parsing_exception\"," \
"\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \
"Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \
"\"caused_by\":{\"type\":\"document_parsing_exception\"," \
"\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \
"Use the index API request parameters.\"}}}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \
"\"error\":{\"type\":\"document_parsing_exception\"," \
"\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \
"Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \
"\"caused_by\":{\"type\":\"document_parsing_exception\"," \
"\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \
"Use the index API request parameters.\"}}}}," \
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}"

#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322
161 changes: 161 additions & 0 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,164 @@ void flb_test_logstash_prefix_separator()
flb_destroy(ctx);
}

static void cb_check_response_success(void *ctx, int ffd,
int res_ret, void *res_data,
size_t res_size, void *data)
{
TEST_CHECK(res_ret == 1);
}

void flb_test_response_success()
{
int ret;
char *response = "{\"took\":1,\"errors\":false,\"items\":[]}";
int size = 37;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"write_operation", "create",
NULL);

/* Enable test mode */
ret = flb_output_set_http_test(ctx, out_ffd, "response",
cb_check_response_success,
NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_response_successes()
{
int ret;
char *response = JSON_RESPONSE_SUCCESSES;
int size = JSON_RESPONSE_SUCCESSES_SIZE;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"write_operation", "create",
NULL);

/* Enable test mode */
ret = flb_output_set_http_test(ctx, out_ffd, "response",
cb_check_response_success,
NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

static void cb_check_response_partially_success(void *ctx, int ffd,
int res_ret, void *res_data,
size_t res_size, void *data)
{
int composed_ret = 0;
composed_ret |= (1 << 0);
composed_ret |= (1 << 7);

TEST_CHECK(res_ret == composed_ret);
/* Check whether contains a success flag or not */
TEST_CHECK((res_ret & (1 << 0)));
}

void flb_test_response_partially_success()
{
int ret;
char *response = JSON_RESPONSE_PARTIALLY_SUCCESS;
int size = JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"write_operation", "create",
NULL);

/* Enable test mode */
ret = flb_output_set_http_test(ctx, out_ffd, "response",
cb_check_response_partially_success,
NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"long_index" , flb_test_long_index },
Expand All @@ -814,5 +972,8 @@ TEST_LIST = {
{"replace_dots" , flb_test_replace_dots },
{"id_key" , flb_test_id_key },
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
{"response_success" , flb_test_response_success },
{"response_successes", flb_test_response_successes },
{"response_partially_success" , flb_test_response_partially_success },
{NULL, NULL}
};

0 comments on commit 04b7e15

Please sign in to comment.