Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mp: fix flb_mp_accessor_keys_remove for sibling keys removal #10013

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions src/flb_mp.c
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,20 @@ struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
return mpa;
}

static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
msgpack_object *key)
/**
* Finds matches for a given key in the list of record accessor patterns.
* Stores the indexes of the matches in the provided array.
*
* @return The number of matches found.
*/
static inline int accessor_key_find_matches(struct flb_mp_accessor *mpa,
msgpack_object *key,
int* matched_indexes)
{
int i;
int count;
int match_count = 0;
int out_index = 0;
struct flb_mp_accessor_match *match;

count = mk_list_size(&mpa->ra_list);
Expand All @@ -516,14 +525,17 @@ static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
}

if (match->start_key == key) {
return i;
match_count++;
matched_indexes[out_index++] = i;
}
}

return -1;
return match_count;
}

static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
static inline int accessor_sub_pack(struct flb_mp_accessor *mpa,
int* matched_indexes,
int match_count,
msgpack_packer *mp_pck,
msgpack_object *key,
msgpack_object *val)
Expand All @@ -533,9 +545,13 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
msgpack_object *k;
msgpack_object *v;
struct flb_mp_map_header mh;
struct flb_mp_accessor_match *match;

if (match->key == key || match->key == val) {
return FLB_FALSE;
for (i = 0; i < match_count; i++) {
match = &mpa->matches[matched_indexes[i]];
if (match->key == key || match->key == val) {
return FLB_FALSE;
}
}

if (key) {
Expand All @@ -548,7 +564,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
k = &val->via.map.ptr[i].key;
v = &val->via.map.ptr[i].val;

ret = accessor_sub_pack(match, mp_pck, k, v);
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, k, v);
if (ret == FLB_TRUE) {
flb_mp_map_header_append(&mh);
}
Expand All @@ -559,7 +575,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
flb_mp_array_header_init(&mh, mp_pck);
for (i = 0; i < val->via.array.size; i++) {
v = &val->via.array.ptr[i];
ret = accessor_sub_pack(match, mp_pck, NULL, v);
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, NULL, v);
if (ret == FLB_TRUE) {
flb_mp_array_header_append(&mh);
}
Expand All @@ -586,6 +602,7 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
int ret;
int rule_id = 0;
int matches = 0;
int* matched_indexes;
msgpack_object *key;
msgpack_object *val;
msgpack_object *s_key;
Expand Down Expand Up @@ -640,6 +657,13 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
/* Initialize map */
flb_mp_map_header_init(&mh, &mp_pck);

/* Initialize array of matching indexes to properly handle sibling keys */
matched_indexes = flb_malloc(sizeof(int) * matches);
if (!matched_indexes) {
flb_errno();
return -1;
}

for (i = 0; i < map->via.map.size; i++) {
key = &map->via.map.ptr[i].key;
val = &map->via.map.ptr[i].val;
Expand All @@ -648,30 +672,29 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
* For every entry on the path, check if we should do a step-by-step
* repackaging or just pack the whole object.
*
* Just check: does this 'key' exists on any path of the record
* accessor patterns ?
*
* Find if the active key in the map, matches an accessor rule, if
* if match we get the match id as return value, otherwise -1.
* Find all matching rules that match this 'key'. Return the number of matches or 0
* if no matches were found. Found matches are stored in the 'matched_indexes' array.
*/
ret = accessor_key_find_match(mpa, key);
if (ret == -1) {
ret = accessor_key_find_matches(mpa, key, matched_indexes);
if (ret == 0) {
/* No matches, it's ok to pack the kv pair */
flb_mp_map_header_append(&mh);
msgpack_pack_object(&mp_pck, *key);
msgpack_pack_object(&mp_pck, *val);
}
else {
/* The key has a match. Now we do a step-by-step packaging */
match = &mpa->matches[ret];
ret = accessor_sub_pack(match, &mp_pck, key, val);

ret = accessor_sub_pack(mpa, matched_indexes, ret, &mp_pck, key, val);
if (ret == FLB_TRUE) {
flb_mp_map_header_append(&mh);
}
}
}
flb_mp_map_header_end(&mh);

flb_free(matched_indexes);

*out_buf = mp_sbuf.data;
*out_size = mp_sbuf.size;

Expand Down
102 changes: 102 additions & 0 deletions tests/internal/mp.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,107 @@ void test_keys_remove_subkey_key()

}

void test_remove_sibling_subkeys()
{
int len;
int ret;
int type;
size_t off = 0;
char *buf;
size_t size;
char *out_buf;
size_t out_size;
char *json;
char final_json[2048] = {0};
msgpack_unpacked result;
msgpack_unpacked result_final;
msgpack_object map;
struct flb_mp_accessor *mpa;
struct mk_list patterns;

/* Sample JSON message */
json =
"{"
"\"kubernetes\": {"
"\"pod_id\": \"id\","
"\"pod_name\": \"name\","
"\"host\": \"localhost\","
"\"labels\": {"
"\"app\": \"myapp\","
"\"tier\": \"backend\""
"}"
"},"
"\"msg\": \"kernel panic\""
"}";

/* Convert to msgpack */
len = strlen(json);
ret = flb_pack_json(json, len, &buf, &size, &type, NULL);
TEST_CHECK(ret == 0);
if (ret == -1) {
exit(EXIT_FAILURE);
}

/* Unpack the content */
msgpack_unpacked_init(&result);
msgpack_unpack_next(&result, buf, size, &off);
map = result.data;

/* Create list of patterns */
flb_slist_create(&patterns);

/* sub key -> key */
flb_slist_add(&patterns, "$kubernetes['pod_name']");
flb_slist_add(&patterns, "$kubernetes['foo']");
flb_slist_add(&patterns, "$kubernetes['pod_id']");
flb_slist_add(&patterns, "$kubernetes['labels']['tier']");
flb_slist_add(&patterns, "$time");


/* Create mp accessor */
mpa = flb_mp_accessor_create(&patterns);
TEST_CHECK(mpa != NULL);

/* Remove the entry that matches the pattern(s) */
ret = flb_mp_accessor_keys_remove(mpa, &map, (void *) &out_buf, &out_size);
TEST_CHECK(ret == FLB_TRUE);

printf("\n=== ORIGINAL ===\n");
flb_pack_print(buf, size);
flb_free(buf);

printf("=== FINAL MAP ===\n");
if (ret == FLB_TRUE) {
flb_pack_print(out_buf, out_size);
}
msgpack_unpacked_destroy(&result);

off = 0;
msgpack_unpacked_init(&result_final);
msgpack_unpack_next(&result_final, out_buf, out_size, &off);
flb_msgpack_to_json(&final_json[0], sizeof(final_json), &result_final.data);

if (!TEST_CHECK(strstr(&final_json[0] ,"pod_id") == NULL)) {
TEST_MSG("pod_id field should be removed");
}
if (!TEST_CHECK(strstr(&final_json[0] ,"pod_name") == NULL)) {
TEST_MSG("pod_name field should be removed");
}
if (!TEST_CHECK(strstr(&final_json[0] ,"tier") == NULL)) {
TEST_MSG("tier field should be removed");
}
if (!TEST_CHECK(strstr(&final_json[0] ,"app") != NULL)) {
TEST_MSG("app field should not be removed");
}

msgpack_unpacked_destroy(&result_final);

flb_free(out_buf);
flb_mp_accessor_destroy(mpa);
flb_slist_destroy(&patterns);

}

void remove_subkey_keys(char *list[], int list_size, int index_start)
{
int len;
Expand Down Expand Up @@ -444,5 +545,6 @@ TEST_LIST = {
{"accessor_keys_remove_subkey_key" , test_keys_remove_subkey_key},
{"accessor_keys_remove_subkey_keys" , test_keys_remove_subkey_keys},
{"object_to_cfl_to_msgpack" , test_object_to_cfl_to_msgpack},
{"test_remove_sibling_subkeys" , test_remove_sibling_subkeys},
{ 0 }
};