Skip to content

Commit 5c96491

Browse files
committed
mp: fix flb_mp_accessor_keys_remove for subkey siblings
Signed-off-by: Arkady Dyakonov <[email protected]>
1 parent 951881a commit 5c96491

File tree

1 file changed

+41
-18
lines changed

1 file changed

+41
-18
lines changed

src/flb_mp.c

+41-18
Original file line numberDiff line numberDiff line change
@@ -501,11 +501,20 @@ struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
501501
return mpa;
502502
}
503503

504-
static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
505-
msgpack_object *key)
504+
/**
505+
* Finds matches for a given key in the list of record accessor patterns.
506+
* Stores the indexes of the matches in the provided array.
507+
*
508+
* @return The number of matches found.
509+
*/
510+
static inline int accessor_key_find_matches(struct flb_mp_accessor *mpa,
511+
msgpack_object *key,
512+
int* matched_indexes)
506513
{
507514
int i;
508515
int count;
516+
int match_count = 0;
517+
int out_index = 0;
509518
struct flb_mp_accessor_match *match;
510519

511520
count = mk_list_size(&mpa->ra_list);
@@ -516,14 +525,17 @@ static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
516525
}
517526

518527
if (match->start_key == key) {
519-
return i;
528+
match_count++;
529+
matched_indexes[out_index++] = i;
520530
}
521531
}
522532

523-
return -1;
533+
return match_count;
524534
}
525535

526-
static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
536+
static inline int accessor_sub_pack(struct flb_mp_accessor *mpa,
537+
int* matched_indexes,
538+
int match_count,
527539
msgpack_packer *mp_pck,
528540
msgpack_object *key,
529541
msgpack_object *val)
@@ -533,9 +545,13 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
533545
msgpack_object *k;
534546
msgpack_object *v;
535547
struct flb_mp_map_header mh;
548+
struct flb_mp_accessor_match *match;
536549

537-
if (match->key == key || match->key == val) {
538-
return FLB_FALSE;
550+
for (i = 0; i < match_count; i++) {
551+
match = &mpa->matches[matched_indexes[i]];
552+
if (match->key == key || match->key == val) {
553+
return FLB_FALSE;
554+
}
539555
}
540556

541557
if (key) {
@@ -548,7 +564,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
548564
k = &val->via.map.ptr[i].key;
549565
v = &val->via.map.ptr[i].val;
550566

551-
ret = accessor_sub_pack(match, mp_pck, k, v);
567+
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, k, v);
552568
if (ret == FLB_TRUE) {
553569
flb_mp_map_header_append(&mh);
554570
}
@@ -559,7 +575,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
559575
flb_mp_array_header_init(&mh, mp_pck);
560576
for (i = 0; i < val->via.array.size; i++) {
561577
v = &val->via.array.ptr[i];
562-
ret = accessor_sub_pack(match, mp_pck, NULL, v);
578+
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, NULL, v);
563579
if (ret == FLB_TRUE) {
564580
flb_mp_array_header_append(&mh);
565581
}
@@ -586,6 +602,7 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
586602
int ret;
587603
int rule_id = 0;
588604
int matches = 0;
605+
int* matched_indexes;
589606
msgpack_object *key;
590607
msgpack_object *val;
591608
msgpack_object *s_key;
@@ -640,6 +657,13 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
640657
/* Initialize map */
641658
flb_mp_map_header_init(&mh, &mp_pck);
642659

660+
/* Initialize array of matching indexes to properly handle sibling keys */
661+
matched_indexes = flb_malloc(sizeof(int) * matches);
662+
if (!matched_indexes) {
663+
flb_errno();
664+
return -1;
665+
}
666+
643667
for (i = 0; i < map->via.map.size; i++) {
644668
key = &map->via.map.ptr[i].key;
645669
val = &map->via.map.ptr[i].val;
@@ -648,30 +672,29 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
648672
* For every entry on the path, check if we should do a step-by-step
649673
* repackaging or just pack the whole object.
650674
*
651-
* Just check: does this 'key' exists on any path of the record
652-
* accessor patterns ?
653-
*
654-
* Find if the active key in the map, matches an accessor rule, if
655-
* if match we get the match id as return value, otherwise -1.
675+
* Find all matching rules that match this 'key'. Return the number of matches or 0
676+
* if no matches were found. Found matches are stored in the 'matched_indexes' array.
656677
*/
657-
ret = accessor_key_find_match(mpa, key);
658-
if (ret == -1) {
678+
ret = accessor_key_find_matches(mpa, key, matched_indexes);
679+
if (ret == 0) {
659680
/* No matches, it's ok to pack the kv pair */
660681
flb_mp_map_header_append(&mh);
661682
msgpack_pack_object(&mp_pck, *key);
662683
msgpack_pack_object(&mp_pck, *val);
663684
}
664685
else {
665686
/* The key has a match. Now we do a step-by-step packaging */
666-
match = &mpa->matches[ret];
667-
ret = accessor_sub_pack(match, &mp_pck, key, val);
687+
688+
ret = accessor_sub_pack(mpa, matched_indexes, ret, &mp_pck, key, val);
668689
if (ret == FLB_TRUE) {
669690
flb_mp_map_header_append(&mh);
670691
}
671692
}
672693
}
673694
flb_mp_map_header_end(&mh);
674695

696+
flb_free(matched_indexes);
697+
675698
*out_buf = mp_sbuf.data;
676699
*out_size = mp_sbuf.size;
677700

0 commit comments

Comments
 (0)