Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_http: parse msgpack payloads emitted by out_http. #8499

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define HTTP_CONTENT_JSON 0
#define HTTP_CONTENT_URLENCODED 1
#define HTTP_CONTENT_MSGPACK 2

static inline char hex2nibble(char c)
{
Expand Down Expand Up @@ -508,6 +509,101 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag,
return ret;
}

static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag,
char *payload, size_t size)
{
int ret = FLB_EVENT_ENCODER_SUCCESS;
struct flb_time tm;
size_t offset = 0;
msgpack_unpacked result;
msgpack_object *record;
msgpack_object *metadata;
msgpack_object *data;
flb_sds_t tag_from_record = NULL;


msgpack_unpacked_init(&result);
pwhelan marked this conversation as resolved.
Show resolved Hide resolved

while (ret == FLB_EVENT_ENCODER_SUCCESS &&
msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) {

if (result.data.type != MSGPACK_OBJECT_ARRAY) {
msgpack_unpacked_destroy(&result);
return -1;
}

record = &result.data;
metadata = &record->via.array.ptr[0];
data = &record->via.array.ptr[1];

if (ctx->tag_key) {
tag_from_record = tag_key(ctx, data);
}

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]);

if (ret == -1) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_timestamp(
&ctx->log_encoder,
&tm);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

if (tag_from_record) {
ret = flb_input_log_append(ctx->ins, tag_from_record,
flb_sds_len(tag_from_record),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else if (tag) {
ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else {
ret = flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}

if (ret != 0) {
msgpack_unpacked_destroy(&result);
return -1;
}

flb_log_event_encoder_reset(&ctx->log_encoder);
}

msgpack_unpacked_destroy(&result);
return 0;
}

static int process_payload(struct flb_http *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
Expand All @@ -534,6 +630,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
type = HTTP_CONTENT_URLENCODED;
}

if (header->val.len == 19 &&
strncasecmp(header->val.data, "application/msgpack", 19) == 0) {
type = HTTP_CONTENT_MSGPACK;
}

if (type == -1) {
send_response(conn, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -550,6 +651,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
else if (type == HTTP_CONTENT_URLENCODED) {
ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len);
}
else if (type == HTTP_CONTENT_MSGPACK) {
ret = parse_payload_msgpack(ctx, tag, request->data.data, request->data.len);
}

if (ret != 0) {
send_response(conn, 400, "error: invalid payload\n");
Expand Down Expand Up @@ -919,6 +1023,10 @@ static int process_payload_ng(flb_sds_t tag,
type = HTTP_CONTENT_URLENCODED;
}

if (strcasecmp(request->content_type, "application/msgpack") == 0) {
type = HTTP_CONTENT_MSGPACK;
}

if (type == -1) {
send_response_ng(response, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -940,6 +1048,13 @@ static int process_payload_ng(flb_sds_t tag,
return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload));
}
}
else if (type == HTTP_CONTENT_MSGPACK) {
ctx = (struct flb_http *) request->stream->user_data;
payload = (char *) request->body;
if (payload) {
return parse_payload_msgpack(ctx, tag, payload, cfl_sds_len(payload));
}
}

return 0;
}
Expand Down
150 changes: 150 additions & 0 deletions tests/runtime/in_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#define JSON_CONTENT_TYPE "application/json"
#define JSON_CHARSET_CONTENT_TYPE "application/json; charset=utf-8"
#define MSGPACK_CONTENT_TYPE "application/msgpack"

struct http_client_ctx {
struct flb_upstream *u;
Expand Down Expand Up @@ -278,6 +279,153 @@ void flb_test_http()
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_msgpack_legacy()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
int num;
size_t b_sent;
char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00"
"\x00\x02\xd7\x00\x65\xd3\x9c\x63"
"\x19\x36\xb8\xd5\x80\x81\xa7\x6d"
"\x65\x73\x73\x61\x67\x65\xa5\x64"
"\x75\x6d\x6d\x79\xbe";


clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"message\":\"dummy\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

flb_input_set(ctx->flb, ctx->i_ffd, "http2", "off", NULL);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf),
"127.0.0.1", 9880, NULL, 0);
ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE),
MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE));
TEST_CHECK(ret == 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("http_client failed");
exit(EXIT_FAILURE);
}

ret = flb_http_do(c, &b_sent);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("ret error. ret=%d\n", ret);
}
else if (!TEST_CHECK(b_sent > 0)){
TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent);
}
else if (!TEST_CHECK(c->resp.status == 201)) {
TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status);
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_msgpack()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
int num;
size_t b_sent;
char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00"
"\x00\x02\xd7\x00\x65\xd3\x9c\x63"
"\x19\x36\xb8\xd5\x80\x81\xa7\x6d"
"\x65\x73\x73\x61\x67\x65\xa5\x64"
"\x75\x6d\x6d\x79\xbe";


clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"message\":\"dummy\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf),
"127.0.0.1", 9880, NULL, 0);
ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE),
MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE));
TEST_CHECK(ret == 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("http_client failed");
exit(EXIT_FAILURE);
}

ret = flb_http_do(c, &b_sent);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("ret error. ret=%d\n", ret);
}
else if (!TEST_CHECK(b_sent > 0)){
TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent);
}
else if (!TEST_CHECK(c->resp.status == 201)) {
TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status);
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}
flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_http_successful_response_code(char *response_code)
{
struct flb_lib_out_cb cb_data;
Expand Down Expand Up @@ -662,6 +810,8 @@ void flb_test_http_tag_key()

TEST_LIST = {
{"http", flb_test_http},
{"msgpack_legacy", flb_test_msgpack_legacy},
{"msgpack", flb_test_msgpack},
{"successful_response_code_200", flb_test_http_successful_response_code_200},
{"successful_response_code_204", flb_test_http_successful_response_code_204},
{"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json},
Expand Down
Loading