Skip to content

Commit

Permalink
in_http: add application/msgpack for http2 (backport of #8499).
Browse files Browse the repository at this point in the history
Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Oct 21, 2024
1 parent 652fa29 commit 9dc2b83
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 0 deletions.
115 changes: 115 additions & 0 deletions plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define HTTP_CONTENT_JSON 0
#define HTTP_CONTENT_URLENCODED 1
#define HTTP_CONTENT_MSGPACK 2

static inline char hex2nibble(char c)
{
Expand Down Expand Up @@ -508,6 +509,101 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag,
return ret;
}

static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag,
char *payload, size_t size)
{
int ret = FLB_EVENT_ENCODER_SUCCESS;
struct flb_time tm;
size_t offset = 0;
msgpack_unpacked result;
msgpack_object *record;
msgpack_object *metadata;
msgpack_object *data;
flb_sds_t tag_from_record = NULL;


msgpack_unpacked_init(&result);

while (ret == FLB_EVENT_ENCODER_SUCCESS &&
msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) {

if (result.data.type != MSGPACK_OBJECT_ARRAY) {
msgpack_unpacked_destroy(&result);
return -1;
}

record = &result.data;
metadata = &record->via.array.ptr[0];
data = &record->via.array.ptr[1];

if (ctx->tag_key) {
tag_from_record = tag_key(ctx, data);
}

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]);

if (ret == -1) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_timestamp(
&ctx->log_encoder,
&tm);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

if (tag_from_record) {
ret = flb_input_log_append(ctx->ins, tag_from_record,
flb_sds_len(tag_from_record),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else if (tag) {
ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else {
ret = flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}

if (ret != 0) {
msgpack_unpacked_destroy(&result);
return -1;
}

flb_log_event_encoder_reset(&ctx->log_encoder);
}

msgpack_unpacked_destroy(&result);
return 0;
}

static int process_payload(struct flb_http *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
Expand All @@ -534,6 +630,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
type = HTTP_CONTENT_URLENCODED;
}

if (header->val.len == 19 &&
strncasecmp(header->val.data, "application/msgpack", 19) == 0) {
type = HTTP_CONTENT_MSGPACK;
}

if (type == -1) {
send_response(conn, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -550,6 +651,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
else if (type == HTTP_CONTENT_URLENCODED) {
ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len);
}
else if (type == HTTP_CONTENT_MSGPACK) {
ret = parse_payload_msgpack(ctx, tag, request->data.data, request->data.len);
}

if (ret != 0) {
send_response(conn, 400, "error: invalid payload\n");
Expand Down Expand Up @@ -919,6 +1023,10 @@ static int process_payload_ng(flb_sds_t tag,
type = HTTP_CONTENT_URLENCODED;
}

if (strcasecmp(request->content_type, "application/msgpack") == 0) {
type = HTTP_CONTENT_MSGPACK;
}

if (type == -1) {
send_response_ng(response, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -940,6 +1048,13 @@ static int process_payload_ng(flb_sds_t tag,
return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload));
}
}
else if (type == HTTP_CONTENT_MSGPACK) {
ctx = (struct flb_http *) request->stream->user_data;
payload = (char *) request->body;
if (payload) {
return parse_payload_msgpack(ctx, tag, payload, cfl_sds_len(payload));
}
}

return 0;
}
Expand Down
150 changes: 150 additions & 0 deletions tests/runtime/in_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#define JSON_CONTENT_TYPE "application/json"
#define JSON_CHARSET_CONTENT_TYPE "application/json; charset=utf-8"
#define MSGPACK_CONTENT_TYPE "application/msgpack"

struct http_client_ctx {
struct flb_upstream *u;
Expand Down Expand Up @@ -278,6 +279,153 @@ void flb_test_http()
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_msgpack_legacy()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
int num;
size_t b_sent;
char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00"
"\x00\x02\xd7\x00\x65\xd3\x9c\x63"
"\x19\x36\xb8\xd5\x80\x81\xa7\x6d"
"\x65\x73\x73\x61\x67\x65\xa5\x64"
"\x75\x6d\x6d\x79\xbe";


clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"message\":\"dummy\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

flb_input_set(ctx->flb, ctx->i_ffd, "http2", "off", NULL);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf),
"127.0.0.1", 9880, NULL, 0);
ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE),
MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE));
TEST_CHECK(ret == 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("http_client failed");
exit(EXIT_FAILURE);
}

ret = flb_http_do(c, &b_sent);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("ret error. ret=%d\n", ret);
}
else if (!TEST_CHECK(b_sent > 0)){
TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent);
}
else if (!TEST_CHECK(c->resp.status == 201)) {
TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status);
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_msgpack()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
int num;
size_t b_sent;
char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00"
"\x00\x02\xd7\x00\x65\xd3\x9c\x63"
"\x19\x36\xb8\xd5\x80\x81\xa7\x6d"
"\x65\x73\x73\x61\x67\x65\xa5\x64"
"\x75\x6d\x6d\x79\xbe";


clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"message\":\"dummy\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf),
"127.0.0.1", 9880, NULL, 0);
ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE),
MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE));
TEST_CHECK(ret == 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("http_client failed");
exit(EXIT_FAILURE);
}

ret = flb_http_do(c, &b_sent);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("ret error. ret=%d\n", ret);
}
else if (!TEST_CHECK(b_sent > 0)){
TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent);
}
else if (!TEST_CHECK(c->resp.status == 201)) {
TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status);
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_http_successful_response_code(char *response_code)
{
struct flb_lib_out_cb cb_data;
Expand Down Expand Up @@ -662,6 +810,8 @@ void flb_test_http_tag_key()

TEST_LIST = {
{"http", flb_test_http},
{"msgpack_legacy", flb_test_msgpack_legacy},
{"msgpack", flb_test_msgpack},
{"successful_response_code_200", flb_test_http_successful_response_code_200},
{"successful_response_code_204", flb_test_http_successful_response_code_204},
{"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json},
Expand Down

0 comments on commit 9dc2b83

Please sign in to comment.