diff --git a/config/template_examples/processor_hydration.yaml b/config/template_examples/processor_hydration.yaml index fc78937eb..e1f8b9543 100644 --- a/config/template_examples/processor_hydration.yaml +++ b/config/template_examples/processor_hydration.yaml @@ -50,7 +50,7 @@ mapping: | "cache": { "operator": "set", "resource": this.cache, - "key": """${! meta("id") }""", + "key": """${! metadata("id") }""", "value": "${! content() }", } } @@ -89,7 +89,7 @@ tests: - cache: operator: set resource: foocache - key: ${! meta("id") } + key: ${! metadata("id") } value: ${! content() } - branch: diff --git a/config/template_examples/processor_log_message.yaml b/config/template_examples/processor_log_message.yaml index 049970f0d..98d9bfac0 100644 --- a/config/template_examples/processor_log_message.yaml +++ b/config/template_examples/processor_log_message.yaml @@ -11,5 +11,5 @@ fields: mapping: | root.log.level = this.level root.log.message = "${! content() }" - root.log.fields.metadata = "${! meta() }" + root.log.fields.metadata = "${! metadata() }" root.log.fields.error = "${! error() }" diff --git a/config/test/deduplicate_by_batch.yaml b/config/test/deduplicate_by_batch.yaml index c24429100..5398304c2 100644 --- a/config/test/deduplicate_by_batch.yaml +++ b/config/test/deduplicate_by_batch.yaml @@ -6,7 +6,7 @@ pipeline: } - dedupe: cache: local - key: ${! meta("batch_tag").from(0) + content() } + key: ${! metadata("batch_tag").from(0) + content() } cache_resources: - label: local diff --git a/internal/bloblang/parser/mapping_parser_test.go b/internal/bloblang/parser/mapping_parser_test.go index e5f83add0..5d55d6aeb 100644 --- a/internal/bloblang/parser/mapping_parser_test.go +++ b/internal/bloblang/parser/mapping_parser_test.go @@ -284,7 +284,7 @@ zed = deleted() }, "test mapping metadata and json": { mapping: `meta foo = foo -bar.baz = meta("bar baz") +bar.baz = metadata("bar baz") meta "bar baz" = deleted()`, input: []part{ { @@ -331,7 +331,7 @@ meta "bar baz" = "test1"`, }, "test mapping delete and json": { mapping: `meta foo = foo -bar.baz = meta("bar baz") +bar.baz = metadata("bar baz") meta = deleted()`, input: []part{ { diff --git a/internal/bloblang/parser/query_arithmetic_parser_test.go b/internal/bloblang/parser/query_arithmetic_parser_test.go index 9f2f03421..53d9a1459 100644 --- a/internal/bloblang/parser/query_arithmetic_parser_test.go +++ b/internal/bloblang/parser/query_arithmetic_parser_test.go @@ -122,7 +122,7 @@ func TestArithmeticParser(t *testing.T) { }, }, "two ints and a string": { - input: `json("foo") + json("bar") + meta("baz").number(0)`, + input: `json("foo") + json("bar") + metadata("baz").number(0)`, output: `17`, messages: []easyMsg{ { @@ -141,7 +141,7 @@ func TestArithmeticParser(t *testing.T) { }, }, "add three ints": { - input: `json("foo") + json("bar") + meta("baz").number()`, + input: `json("foo") + json("bar") + metadata("baz").number()`, output: `20`, messages: []easyMsg{ { @@ -167,7 +167,7 @@ func TestArithmeticParser(t *testing.T) { }, }, "sub and add two ints": { - input: `json("foo") + json("bar") - meta("foo").number() - meta("bar").number()`, + input: `json("foo") + json("bar") - metadata("foo").number() - metadata("bar").number()`, output: `6`, messages: []easyMsg{ { diff --git a/internal/bloblang/parser/query_function_parser_test.go b/internal/bloblang/parser/query_function_parser_test.go index b7a98d164..0370b5c8b 100644 --- a/internal/bloblang/parser/query_function_parser_test.go +++ b/internal/bloblang/parser/query_function_parser_test.go @@ -120,7 +120,7 @@ func TestFunctionQueries(t *testing.T) { }, }, "json function dynamic arg": { - input: `json(meta("path"))`, + input: `json(metadata("path"))`, output: `this`, messages: []easyMsg{ { @@ -162,7 +162,7 @@ func TestFunctionQueries(t *testing.T) { }, }, "metadata triple quote string arg 1": { - input: `meta("""foo""")`, + input: `metadata("""foo""")`, output: `bar`, index: 1, messages: []easyMsg{ @@ -177,7 +177,7 @@ func TestFunctionQueries(t *testing.T) { }, }, "metadata triple quote string arg 2": { - input: `meta("""foo + input: `metadata("""foo bar""")`, output: `bar`, index: 1, @@ -193,7 +193,7 @@ bar""")`, }, }, "metadata 1": { - input: `meta("foo")`, + input: `metadata("foo")`, output: `bar`, index: 1, messages: []easyMsg{ @@ -208,7 +208,7 @@ bar""")`, }, }, "metadata 2": { - input: `meta("bar")`, + input: `metadata("bar")`, output: "null", messages: []easyMsg{ { @@ -221,7 +221,7 @@ bar""")`, }, }, "metadata 3": { - input: `meta()`, + input: `metadata()`, output: `{"baz":"qux","duck,1":"quack","foo":"bar"}`, messages: []easyMsg{ { @@ -234,7 +234,7 @@ bar""")`, }, }, "metadata 4": { - input: `meta("duck,1")`, + input: `metadata("duck,1")`, output: "quack", messages: []easyMsg{ { @@ -247,7 +247,7 @@ bar""")`, }, }, "metadata 5": { - input: `meta("foo").from(1)`, + input: `metadata("foo").from(1)`, output: "bar", index: 0, messages: []easyMsg{ @@ -262,7 +262,7 @@ bar""")`, }, }, "metadata 6": { - input: `meta("foo")`, + input: `metadata("foo")`, output: `null`, index: 1, messages: []easyMsg{ @@ -277,7 +277,7 @@ bar""")`, }, }, "metadata 7": { - input: `meta().from(1)`, + input: `metadata().from(1)`, output: `{}`, messages: []easyMsg{ { @@ -290,7 +290,7 @@ bar""")`, }, }, "metadata 8": { - input: `meta().from(1)`, + input: `metadata().from(1)`, output: `{"baz":"qux","duck,1":"quack","foo":"bar"}`, messages: []easyMsg{ {}, diff --git a/internal/bloblang/parser/query_method_parser_test.go b/internal/bloblang/parser/query_method_parser_test.go index 2931c8841..55c30229f 100644 --- a/internal/bloblang/parser/query_method_parser_test.go +++ b/internal/bloblang/parser/query_method_parser_test.go @@ -145,7 +145,7 @@ func TestMethodParser(t *testing.T) { }, }, "meta from all": { - input: `meta("foo").from_all()`, + input: `metadata("foo").from_all()`, output: `["bar",null,"baz"]`, messages: []easyMsg{ {meta: map[string]any{"foo": "bar"}}, @@ -185,7 +185,7 @@ func TestMethodParser(t *testing.T) { }, }, "or boolean from metadata": { - input: `meta("foo").or( meta("bar") == "yep" ).from_all()`, + input: `metadata("foo").or( metadata("bar") == "yep" ).from_all()`, output: `["from foo",true,false,"from foo 2"]`, messages: []easyMsg{ {meta: map[string]any{"foo": "from foo"}}, @@ -331,14 +331,14 @@ func TestMethodParser(t *testing.T) { messages: []easyMsg{{}}, }, "test format method 2": { - input: `"foo %v bar".format(meta("foo"))`, + input: `"foo %v bar".format(metadata("foo"))`, output: `foo test bar`, messages: []easyMsg{{ meta: map[string]any{"foo": "test"}, }}, }, "test format method 3": { - input: `json().("foo %v, %v, %v bar".format(value, meta("foo"), 3))`, + input: `json().("foo %v, %v, %v bar".format(value, metadata("foo"), 3))`, output: `foo yup, bar, 3 bar`, messages: []easyMsg{{ content: `{"value":"yup"}`, @@ -516,7 +516,7 @@ func TestMethodMaps(t *testing.T) { }}, }, "map dynamic": { - input: `json().apply(meta("dyn_map"))`, + input: `json().apply(metadata("dyn_map"))`, output: "this value", maps: map[string]query.Function{ "foo": query.NewFieldFunction("foo"), @@ -530,7 +530,7 @@ func TestMethodMaps(t *testing.T) { }}, }, "map dynamic 2": { - input: `json().apply(meta("dyn_map"))`, + input: `json().apply(metadata("dyn_map"))`, output: "and this value", maps: map[string]query.Function{ "foo": query.NewFieldFunction("foo"), diff --git a/internal/bloblang/parser/query_parser_test.go b/internal/bloblang/parser/query_parser_test.go index 637d5dff1..79c6475b7 100644 --- a/internal/bloblang/parser/query_parser_test.go +++ b/internal/bloblang/parser/query_parser_test.go @@ -63,7 +63,7 @@ func TestFunctionParserErrors(t *testing.T) { err: `line 1 char 16: expected query`, }, "bad expression 3": { - input: `(json("foo") + meta("bar") `, + input: `(json("foo") + metadata("bar") `, err: `line 1 char 28: required: expected closing bracket`, }, "bad method": { @@ -141,19 +141,19 @@ func TestFunctionParserLimits(t *testing.T) { remaining string }{ "nothing": { - input: `json("foo") + meta("bar")`, + input: `json("foo") + metadata("bar")`, remaining: ``, }, "space before": { - input: ` json("foo") + meta("bar")`, + input: ` json("foo") + metadata("bar")`, remaining: ``, }, "space before 2": { - input: ` json("foo") + meta("bar")`, + input: ` json("foo") + metadata("bar")`, remaining: ``, }, "unfinished comment": { - input: `json("foo") + meta("bar") # Here's a comment`, + input: `json("foo") + metadata("bar") # Here's a comment`, remaining: ` # Here's a comment`, }, "extra text": { @@ -161,15 +161,15 @@ func TestFunctionParserLimits(t *testing.T) { remaining: ` and this`, }, "extra text 2": { - input: `json("foo") + meta("bar") and this`, + input: `json("foo") + metadata("bar") and this`, remaining: ` and this`, }, "extra text 3": { - input: `json("foo")+meta("bar")and this`, + input: `json("foo")+metadata("bar")and this`, remaining: `and this`, }, "extra text 4": { - input: `json("foo")+meta("bar") and this`, + input: `json("foo")+metadata("bar") and this`, remaining: ` and this`, }, "squiggly bracket": { diff --git a/internal/bloblang/query/functions.go b/internal/bloblang/query/functions.go index 22273d661..f8a5f592b 100644 --- a/internal/bloblang/query/functions.go +++ b/internal/bloblang/query/functions.go @@ -557,12 +557,12 @@ var _ = registerFunction( "meta", "Returns the value of a metadata key from the input message as a string, or `null` if the key does not exist. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. In order to query metadata mutations made within a mapping use the <>. This function supports extracting metadata from other messages of a batch with the `from` method.", NewExampleSpec("", - `root.topic = meta("kafka_topic")`, - `root.topic = meta("nope") | meta("also nope") | "default"`, + `root.topic = metadata("kafka_topic")`, + `root.topic = metadata("nope") | metadata("also nope") | "default"`, ), NewExampleSpec( "The key parameter is optional and if omitted the entire metadata contents are returned as an object.", - `root.all_metadata = meta()`, + `root.all_metadata = metadata()`, ), ).Param(ParamString("key", "An optional key of a metadata value to obtain.").Default("")), func(args *ParsedParams) (Function, error) { @@ -609,12 +609,12 @@ var _ = registerFunction( "root_meta", "Returns the value of a metadata key from the new message being created as a string, or `null` if the key does not exist. Changes made to metadata during a mapping will be reflected by this function.", NewExampleSpec("", - `root.topic = root_meta("kafka_topic")`, - `root.topic = root_meta("nope") | root_meta("also nope") | "default"`, + `root.topic = root_metadata("kafka_topic")`, + `root.topic = root_metadata("nope") | root_metadata("also nope") | "default"`, ), NewExampleSpec( "The key parameter is optional and if omitted the entire metadata contents are returned as an object.", - `root.all_metadata = root_meta()`, + `root.all_metadata = root_metadata()`, ), ).Param(ParamString("key", "An optional key of a metadata value to obtain.").Default("")), func(args *ParsedParams) (Function, error) { diff --git a/internal/cli/test/processors_provider_test.go b/internal/cli/test/processors_provider_test.go index 2e6655648..7e7292198 100644 --- a/internal/cli/test/processors_provider_test.go +++ b/internal/cli/test/processors_provider_test.go @@ -88,7 +88,7 @@ pipeline: resource: foocache operator: set key: defaultkey - value: ${! meta("foo") } + value: ${! metadata("foo") } - cache: resource: foocache operator: get diff --git a/internal/component/metrics/namespaced_test.go b/internal/component/metrics/namespaced_test.go index 768b36c7b..f32ebb187 100644 --- a/internal/component/metrics/namespaced_test.go +++ b/internal/component/metrics/namespaced_test.go @@ -210,12 +210,12 @@ func TestNamespacedPrefixStaticLabelsWithMappings(t *testing.T) { func TestNamespacedPrefixStaticLabelsWithMappingLabels(t *testing.T) { prom, handler := getTestMetrics(t) - mappingFooToBar, err := metrics.NewMapping(`meta = meta().map_each(kv -> kv.value.replace_all("value","bar")) + mappingFooToBar, err := metrics.NewMapping(`meta = metadata().map_each(kv -> kv.value.replace_all("value","bar")) meta extra1 = "extravalue1" root = this.replace_all("foo","bar")`, log.Noop()) require.NoError(t, err) - mappingBarToBaz, err := metrics.NewMapping(`meta = meta().map_each(kv -> kv.value.replace_all("bar","baz")) + mappingBarToBaz, err := metrics.NewMapping(`meta = metadata().map_each(kv -> kv.value.replace_all("bar","baz")) meta extra2 = "extravalue2" root = this.replace_all("bar","baz")`, log.Noop()) require.NoError(t, err) diff --git a/internal/config/test/docs.adoc b/internal/config/test/docs.adoc index be870e5f3..ae09e1310 100644 --- a/internal/config/test/docs.adoc +++ b/internal/config/test/docs.adoc @@ -32,7 +32,7 @@ pipeline: output: aws_s3: bucket: TODO - path: '${! meta("kafka_topic") }/${! json("message.id") }.json' + path: '${! metadata("kafka_topic") }/${! json("message.id") }.json' ``` One way to write our unit tests for this config is to accompany it with a file of the same name and extension but suffixed with `_benthos_test`, which in this case would be `foo_benthos_test.yaml`. diff --git a/internal/impl/io/input_http_server.go b/internal/impl/io/input_http_server.go index 3c6521dc2..7b4369351 100644 --- a/internal/impl/io/input_http_server.go +++ b/internal/impl/io/input_http_server.go @@ -255,7 +255,7 @@ You can access these metadata fields using xref:configuration:interpolation.adoc service.NewObjectField(hsiFieldResponse, service.NewInterpolatedStringField(hsiFieldResponseStatus). Description("Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata."). - Examples(`${! json("status") }`, `${! meta("status") }`). + Examples(`${! json("status") }`, `${! metadata("status") }`). Default("200"), service.NewInterpolatedStringMapField(hsiFieldResponseHeaders). Description("Specify headers to return with synchronous responses."). diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index bc98acd11..e2ba9186f 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -1195,7 +1195,7 @@ func TestHTTPSyncResponseHeadersStatus(t *testing.T) { http_server: path: /testpost sync_response: - status: '${! meta("status").or("200") }' + status: '${! metadata("status").or("200") }' headers: Content-Type: application/json foo: '${!json("field1")}' diff --git a/internal/impl/pure/buffer_system_window.go b/internal/impl/pure/buffer_system_window.go index 5cfd4d242..2a71d7fdb 100644 --- a/internal/impl/pure/buffer_system_window.go +++ b/internal/impl/pure/buffer_system_window.go @@ -53,7 +53,7 @@ A xref:guides:bloblang/about.adoc[Bloblang mapping] applied to each message duri The timestamp value assigned to `+"`root`"+` must either be a numerical unix time in seconds (with up to nanosecond precision via decimals), or a string in ISO 8601 format. If the mapping fails or provides an invalid result the message will be dropped (with logging to describe the problem). `). Default("root = now()"). - Example("root = this.created_at").Example(`root = meta("kafka_timestamp_unix").number()`)). + Example("root = this.created_at").Example(`root = metadata("kafka_timestamp_unix").number()`)). Field(service.NewStringField("size"). Description("A duration string describing the size of each window. By default windows are aligned to the zeroth minute and zeroth hour on the UTC clock, meaning windows of 1 hour duration will match the turn of each hour in the day, this can be adjusted with the `offset` field."). Example("30s").Example("10m")). @@ -110,7 +110,7 @@ pipeline: root = if batch_index() == 0 { { "traffic_light": this.traffic_light, - "created_at": meta("window_end_timestamp"), + "created_at": metadata("window_end_timestamp"), "total_cars": json("registration_plate").from_all().unique().length(), "passengers": json("passengers").from_all().sum(), } diff --git a/internal/impl/pure/output_cache.go b/internal/impl/pure/output_cache.go index cdfbbd210..2959616ef 100644 --- a/internal/impl/pure/output_cache.go +++ b/internal/impl/pure/output_cache.go @@ -53,7 +53,7 @@ In order to create a unique `+"`key`"+` value per item you should use function i Examples( `${!count("items")}-${!timestamp_unix_nano()}`, `${!json("doc.id")}`, - `${!meta("kafka_key")}`, + `${!metadata("kafka_key")}`, ). Default(`${!count("items")}-${!timestamp_unix_nano()}`), service.NewInterpolatedStringField(coFieldTTL). diff --git a/internal/impl/pure/processor_archive.go b/internal/impl/pure/processor_archive.go index 74973844d..22ef3b341 100644 --- a/internal/impl/pure/processor_archive.go +++ b/internal/impl/pure/processor_archive.go @@ -36,7 +36,7 @@ The functionality of this processor depends on being applied across messages tha Field(service.NewInterpolatedStringField("path"). Description("The path to set for each message in the archive (when applicable)."). Example("${!count(\"files\")}-${!timestamp_unix_nano()}.txt"). - Example("${!meta(\"kafka_key\")}-${!json(\"id\")}.json"). + Example("${!metadata(\"kafka_key\")}-${!json(\"id\")}.json"). Default("")). Example("Tar Archive", ` If we had JSON messages in a batch each of the form: diff --git a/internal/impl/pure/processor_archive_test.go b/internal/impl/pure/processor_archive_test.go index 846509523..67cba211c 100644 --- a/internal/impl/pure/processor_archive_test.go +++ b/internal/impl/pure/processor_archive_test.go @@ -29,7 +29,7 @@ format: does not exist func TestArchiveTar(t *testing.T) { conf, err := archiveProcConfig().ParseYAML(` format: tar -path: 'foo-${!meta("path")}' +path: 'foo-${!metadata("path")}' `, nil) require.NoError(t, err) diff --git a/internal/impl/pure/processor_bloblang_test.go b/internal/impl/pure/processor_bloblang_test.go index fcb3e5945..661b4d027 100644 --- a/internal/impl/pure/processor_bloblang_test.go +++ b/internal/impl/pure/processor_bloblang_test.go @@ -35,8 +35,8 @@ func TestBloblangCrossfire(t *testing.T) { foo = json("foo").from(0) foo.bar_new = "this is swapped now" foo.bar.baz = "and this changed" - meta foo = meta("foo").from(0) - meta bar = meta("bar").from(0) + meta foo = metadata("foo").from(0) + meta bar = metadata("bar").from(0) meta baz = "new meta" ` proc, err := mock.NewManager().NewProcessor(conf) diff --git a/internal/impl/pure/processor_branch_test.go b/internal/impl/pure/processor_branch_test.go index 69ccd13f6..ac56b2d11 100644 --- a/internal/impl/pure/processor_branch_test.go +++ b/internal/impl/pure/processor_branch_test.go @@ -62,9 +62,9 @@ func TestBranchBasic(t *testing.T) { }, }, "copy metadata over only": { - requestMap: `meta foo = meta("foo")`, - processorMap: `meta foo = meta("foo") + " and this"`, - resultMap: `meta new_foo = meta("foo")`, + requestMap: `meta foo = metadata("foo")`, + processorMap: `meta foo = metadata("foo") + " and this"`, + resultMap: `meta new_foo = metadata("foo")`, input: []mockMsg{ msg( `{"value":"foobar"}`, diff --git a/internal/impl/pure/processor_cached.go b/internal/impl/pure/processor_cached.go index edd42514d..6b28ff9ea 100644 --- a/internal/impl/pure/processor_cached.go +++ b/internal/impl/pure/processor_cached.go @@ -29,8 +29,8 @@ func newCachedProcessorConfigSpec() *service.ConfigSpec { Description("A key to be resolved for each message, if the key already exists in the cache then the cached result is used, otherwise the processors are applied and the result is cached under this key. The key could be static and therefore apply generally to all messages or it could be an interpolated expression that is potentially unique for each message."). Example("my_foo_result"). Example(`${! this.document.id }`). - Example(`${! meta("kafka_key") }`). - Example(`${! meta("kafka_topic") }`)). + Example(`${! metadata("kafka_key") }`). + Example(`${! metadata("kafka_topic") }`)). Field(service.NewInterpolatedStringField("ttl"). Description("An optional expiry period to set for each cache entry. Some caches only have a general TTL and will therefore ignore this setting."). Optional()). @@ -44,12 +44,12 @@ pipeline: - branch: processors: - cached: - key: '${! meta("kafka_topic") }-${! meta("kafka_partition") }' + key: '${! metadata("kafka_topic") }-${! metadata("kafka_partition") }' cache: foo_cache processors: - mapping: 'root = ""' - http: - url: http://example.com/enrichment/${! meta("kafka_topic") }/${! meta("kafka_partition") } + url: http://example.com/enrichment/${! metadata("kafka_topic") }/${! metadata("kafka_partition") } verb: GET result_map: 'root.enrichment = this' diff --git a/internal/impl/pure/processor_cached_test.go b/internal/impl/pure/processor_cached_test.go index a27b0f4a2..6298c37a9 100644 --- a/internal/impl/pure/processor_cached_test.go +++ b/internal/impl/pure/processor_cached_test.go @@ -92,8 +92,8 @@ processors: func TestCachedHappyBatched(t *testing.T) { conf, err := newCachedProcessorConfigSpec().ParseYAML(` -key: ${! meta("key") } -ttl: ${! meta("ttl").or("60s")} +key: ${! metadata("key") } +ttl: ${! metadata("ttl").or("60s")} cache: foo processors: - bloblang: 'root = this.map_each(ele -> ele + " FOO")' diff --git a/internal/impl/pure/processor_dedupe.go b/internal/impl/pure/processor_dedupe.go index 14e58c7f6..f3550eeb6 100644 --- a/internal/impl/pure/processor_dedupe.go +++ b/internal/impl/pure/processor_dedupe.go @@ -49,7 +49,7 @@ pipeline: processors: - dedupe: cache: keycache - key: ${! meta("kafka_key") } + key: ${! metadata("kafka_key") } cache_resources: - label: keycache @@ -62,7 +62,7 @@ cache_resources: Description("The xref:components:caches/about.adoc[`cache` resource] to target with this processor."), service.NewInterpolatedStringField(dedupFieldKey). Description("An interpolated string yielding the key to deduplicate by for each message."). - Examples(`${! meta("kafka_key") }`, `${! content().hash("xxhash64") }`), + Examples(`${! metadata("kafka_key") }`, `${! content().hash("xxhash64") }`), service.NewBoolField(dedupFieldDropOnCacheErr). Description("Whether messages should be dropped when the cache returns a general error such as a network issue."). Default(true), diff --git a/internal/impl/pure/processor_group_by.go b/internal/impl/pure/processor_group_by.go index c0e177bb2..c2c68d5bd 100644 --- a/internal/impl/pure/processor_group_by.go +++ b/internal/impl/pure/processor_group_by.go @@ -46,7 +46,7 @@ pipeline: output: switch: cases: - - check: meta("grouping") == "foo" + - check: metadata("grouping") == "foo" output: gcp_pubsub: project: foo_prod diff --git a/internal/impl/pure/processor_group_by_value.go b/internal/impl/pure/processor_group_by_value.go index 2a01d19e7..3c537df04 100644 --- a/internal/impl/pure/processor_group_by_value.go +++ b/internal/impl/pure/processor_group_by_value.go @@ -36,7 +36,7 @@ If we were consuming Kafka messages and needed to group them by their key, archi pipeline: processors: - group_by_value: - value: ${! meta("kafka_key") } + value: ${! metadata("kafka_key") } - archive: format: tar - compress: @@ -44,11 +44,11 @@ pipeline: output: aws_s3: bucket: TODO - path: docs/${! meta("kafka_key") }/${! count("files") }-${! timestamp_unix_nano() }.tar.gz + path: docs/${! metadata("kafka_key") }/${! count("files") }-${! timestamp_unix_nano() }.tar.gz `+"```"+``). Field(service.NewInterpolatedStringField(gbvpFieldValue). Description("The interpolated string to group based on."). - Examples("${! meta(\"kafka_key\") }", "${! json(\"foo.bar\") }-${! meta(\"baz\") }")), + Examples("${! metadata(\"kafka_key\") }", "${! json(\"foo.bar\") }-${! metadata(\"baz\") }")), func(conf *service.ParsedConfig, res *service.Resources) (service.BatchProcessor, error) { valueStr, err := conf.FieldString(gbvpFieldValue) if err != nil { diff --git a/internal/impl/pure/processor_log.go b/internal/impl/pure/processor_log.go index 1c4352368..86e822d60 100644 --- a/internal/impl/pure/processor_log.go +++ b/internal/impl/pure/processor_log.go @@ -45,7 +45,7 @@ pipeline: root.reason = "cus I wana" root.id = this.id root.age = this.user.age - root.kafka_topic = meta("kafka_topic") + root.kafka_topic = metadata("kafka_topic") `+"```"+` `). Fields( @@ -59,7 +59,7 @@ pipeline: `root.reason = "cus I wana" root.id = this.id root.age = this.user.age.number() -root.kafka_topic = meta("kafka_topic")`, +root.kafka_topic = metadata("kafka_topic")`, ). Optional(), service.NewInterpolatedStringField(logPFieldMessage). diff --git a/internal/impl/pure/processor_mapping_test.go b/internal/impl/pure/processor_mapping_test.go index 298c89f88..50777007f 100644 --- a/internal/impl/pure/processor_mapping_test.go +++ b/internal/impl/pure/processor_mapping_test.go @@ -35,8 +35,8 @@ func TestMappingCreateCrossfire(t *testing.T) { foo = json("foo").from(0) foo.bar_new = "this is swapped now" foo.bar.baz = "and this changed" -meta foo = meta("foo").from(0) -meta bar = meta("bar").from(0) +meta foo = metadata("foo").from(0) +meta bar = metadata("bar").from(0) meta baz = "new meta" `) require.NoError(t, err) diff --git a/internal/impl/pure/processor_metric.go b/internal/impl/pure/processor_metric.go index 7acd8a460..cf37e3214 100644 --- a/internal/impl/pure/processor_metric.go +++ b/internal/impl/pure/processor_metric.go @@ -89,8 +89,8 @@ pipeline: name: Foos type: counter labels: - topic: ${! meta("kafka_topic") } - partition: ${! meta("kafka_partition") } + topic: ${! metadata("kafka_topic") } + partition: ${! metadata("kafka_partition") } type: ${! json("document.type").or("unknown") } metrics: @@ -114,7 +114,7 @@ pipeline: name: FooSize type: gauge labels: - topic: ${! meta("kafka_topic") } + topic: ${! metadata("kafka_topic") } value: ${! json("foo.size") } metrics: @@ -131,7 +131,7 @@ metrics: Description("A map of label names and values that can be used to enrich metrics. Labels are not supported by some metric destinations, in which case the metrics series are combined."). Example(map[string]any{ "type": "${! json(\"doc.type\") }", - "topic": "${! meta(\"kafka_topic\") }", + "topic": "${! metadata(\"kafka_topic\") }", }). Optional(), service.NewInterpolatedStringField(metProcFieldValue). diff --git a/internal/impl/pure/processor_mutation_test.go b/internal/impl/pure/processor_mutation_test.go index 2138be1a0..d3cb38c8d 100644 --- a/internal/impl/pure/processor_mutation_test.go +++ b/internal/impl/pure/processor_mutation_test.go @@ -36,8 +36,8 @@ a = batch_index() foo = json("foo").from(0) foo.bar_new = "this is swapped now" foo.bar.baz = "and this changed" -meta foo = meta("foo").from(0) -meta bar = meta("bar").from(0) +meta foo = metadata("foo").from(0) +meta bar = metadata("bar").from(0) meta baz = "new meta" `) require.NoError(t, err) diff --git a/public/bloblang/executor_test.go b/public/bloblang/executor_test.go index e9051b959..edf15411e 100644 --- a/public/bloblang/executor_test.go +++ b/public/bloblang/executor_test.go @@ -17,7 +17,7 @@ func TestExecutorQuery(t *testing.T) { }{ { name: "no metadata get", - mapping: `root = meta("foo")`, + mapping: `root = metadata("foo")`, output: nil, }, { @@ -76,7 +76,7 @@ func TestExecutorOverlay(t *testing.T) { }{ { name: "no metadata get", - mapping: `root = meta("foo")`, + mapping: `root = metadata("foo")`, output: nil, }, { diff --git a/public/service/benchmark_test.go b/public/service/benchmark_test.go index 50b661887..022bab10b 100644 --- a/public/service/benchmark_test.go +++ b/public/service/benchmark_test.go @@ -359,7 +359,7 @@ pipeline: processors: - branch: request_map: | - root.foo = meta("foo") + root.foo = metadata("foo") root.email = this.email processors: - mapping: root = content().uppercase() @@ -367,7 +367,7 @@ pipeline: root.foo_stuff = content().string() - branch: request_map: | - root.bar = meta("bar") + root.bar = metadata("bar") root.name = this.name processors: - mapping: root = content().uppercase() @@ -400,7 +400,7 @@ pipeline: branches: foo_stuff: request_map: | - root.foo = meta("foo") + root.foo = metadata("foo") root.email = this.email processors: - mapping: root = content().uppercase() @@ -408,7 +408,7 @@ pipeline: root.foo_stuff = content().string() bar_stuff: request_map: | - root.bar = meta("bar") + root.bar = metadata("bar") root.name = this.name processors: - mapping: root = content().uppercase() diff --git a/public/service/config_interpolated_string_test.go b/public/service/config_interpolated_string_test.go index 681789c69..5f46f964d 100644 --- a/public/service/config_interpolated_string_test.go +++ b/public/service/config_interpolated_string_test.go @@ -10,7 +10,7 @@ func TestFieldInterpolatedStringList(t *testing.T) { conf := ` listfield: - hello ${! json("name").uppercase() } - - see you in ${! meta("ttl_days") } days + - see you in ${! metadata("ttl_days") } days ` spec := NewConfigSpec().Field(NewInterpolatedStringListField("listfield")) @@ -39,7 +39,7 @@ func TestFieldInterpolatedStringList_InvalidInterpolation(t *testing.T) { conf := ` listfield: - hello ${! json("name")$$uppercas } - - see you in ${! meta("ttl_days") } days + - see you in ${! metadata("ttl_days") } days ` spec := NewConfigSpec().Field(NewInterpolatedStringListField("listfield")) diff --git a/public/service/interpolated_string_test.go b/public/service/interpolated_string_test.go index 502c0d31b..cafc5f3d1 100644 --- a/public/service/interpolated_string_test.go +++ b/public/service/interpolated_string_test.go @@ -30,7 +30,7 @@ func TestInterpolatedString(t *testing.T) { }, { name: "metadata interpolation", - expr: `foo ${! meta("var1") } bar`, + expr: `foo ${! metadata("var1") } bar`, msg: func() *Message { m := NewMessage([]byte("hello world")) m.MetaSet("var1", "value1") @@ -78,7 +78,7 @@ func TestInterpolatedString(t *testing.T) { func TestInterpolatedStringCtor(t *testing.T) { t.Parallel() - i, err := NewInterpolatedString(`foo ${! meta("var1") bar`) + i, err := NewInterpolatedString(`foo ${! metadata("var1") bar`) assert.EqualError(t, err, "required: expected end of expression, got: bar") assert.Nil(t, i) @@ -87,7 +87,7 @@ func TestInterpolatedStringCtor(t *testing.T) { func TestInterpolatedStringMethods(t *testing.T) { t.Parallel() - i, err := NewInterpolatedString(`foo ${! meta("var1") + 1 } bar`) + i, err := NewInterpolatedString(`foo ${! metadata("var1") + 1 } bar`) require.NoError(t, err) m := NewMessage([]byte("hello world"))