Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deprecated meta #13

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/template_examples/processor_hydration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mapping: |
"cache": {
"operator": "set",
"resource": this.cache,
"key": """${! meta("id") }""",
"key": """${! metadata("id") }""",
"value": "${! content() }",
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ tests:
- cache:
operator: set
resource: foocache
key: ${! meta("id") }
key: ${! metadata("id") }
value: ${! content() }

- branch:
Expand Down
2 changes: 1 addition & 1 deletion config/template_examples/processor_log_message.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ fields:
mapping: |
root.log.level = this.level
root.log.message = "${! content() }"
root.log.fields.metadata = "${! meta() }"
root.log.fields.metadata = "${! metadata() }"
root.log.fields.error = "${! error() }"
2 changes: 1 addition & 1 deletion config/test/deduplicate_by_batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pipeline:
}
- dedupe:
cache: local
key: ${! meta("batch_tag").from(0) + content() }
key: ${! metadata("batch_tag").from(0) + content() }

cache_resources:
- label: local
Expand Down
4 changes: 2 additions & 2 deletions internal/bloblang/parser/mapping_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ zed = deleted()
},
"test mapping metadata and json": {
mapping: `meta foo = foo
bar.baz = meta("bar baz")
bar.baz = metadata("bar baz")
meta "bar baz" = deleted()`,
input: []part{
{
Expand Down Expand Up @@ -331,7 +331,7 @@ meta "bar baz" = "test1"`,
},
"test mapping delete and json": {
mapping: `meta foo = foo
bar.baz = meta("bar baz")
bar.baz = metadata("bar baz")
meta = deleted()`,
input: []part{
{
Expand Down
6 changes: 3 additions & 3 deletions internal/bloblang/parser/query_arithmetic_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestArithmeticParser(t *testing.T) {
},
},
"two ints and a string": {
input: `json("foo") + json("bar") + meta("baz").number(0)`,
input: `json("foo") + json("bar") + metadata("baz").number(0)`,
output: `17`,
messages: []easyMsg{
{
Expand All @@ -141,7 +141,7 @@ func TestArithmeticParser(t *testing.T) {
},
},
"add three ints": {
input: `json("foo") + json("bar") + meta("baz").number()`,
input: `json("foo") + json("bar") + metadata("baz").number()`,
output: `20`,
messages: []easyMsg{
{
Expand All @@ -167,7 +167,7 @@ func TestArithmeticParser(t *testing.T) {
},
},
"sub and add two ints": {
input: `json("foo") + json("bar") - meta("foo").number() - meta("bar").number()`,
input: `json("foo") + json("bar") - metadata("foo").number() - metadata("bar").number()`,
output: `6`,
messages: []easyMsg{
{
Expand Down
22 changes: 11 additions & 11 deletions internal/bloblang/parser/query_function_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestFunctionQueries(t *testing.T) {
},
},
"json function dynamic arg": {
input: `json(meta("path"))`,
input: `json(metadata("path"))`,
output: `this`,
messages: []easyMsg{
{
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestFunctionQueries(t *testing.T) {
},
},
"metadata triple quote string arg 1": {
input: `meta("""foo""")`,
input: `metadata("""foo""")`,
output: `bar`,
index: 1,
messages: []easyMsg{
Expand All @@ -177,7 +177,7 @@ func TestFunctionQueries(t *testing.T) {
},
},
"metadata triple quote string arg 2": {
input: `meta("""foo
input: `metadata("""foo
bar""")`,
output: `bar`,
index: 1,
Expand All @@ -193,7 +193,7 @@ bar""")`,
},
},
"metadata 1": {
input: `meta("foo")`,
input: `metadata("foo")`,
output: `bar`,
index: 1,
messages: []easyMsg{
Expand All @@ -208,7 +208,7 @@ bar""")`,
},
},
"metadata 2": {
input: `meta("bar")`,
input: `metadata("bar")`,
output: "null",
messages: []easyMsg{
{
Expand All @@ -221,7 +221,7 @@ bar""")`,
},
},
"metadata 3": {
input: `meta()`,
input: `metadata()`,
output: `{"baz":"qux","duck,1":"quack","foo":"bar"}`,
messages: []easyMsg{
{
Expand All @@ -234,7 +234,7 @@ bar""")`,
},
},
"metadata 4": {
input: `meta("duck,1")`,
input: `metadata("duck,1")`,
output: "quack",
messages: []easyMsg{
{
Expand All @@ -247,7 +247,7 @@ bar""")`,
},
},
"metadata 5": {
input: `meta("foo").from(1)`,
input: `metadata("foo").from(1)`,
output: "bar",
index: 0,
messages: []easyMsg{
Expand All @@ -262,7 +262,7 @@ bar""")`,
},
},
"metadata 6": {
input: `meta("foo")`,
input: `metadata("foo")`,
output: `null`,
index: 1,
messages: []easyMsg{
Expand All @@ -277,7 +277,7 @@ bar""")`,
},
},
"metadata 7": {
input: `meta().from(1)`,
input: `metadata().from(1)`,
output: `{}`,
messages: []easyMsg{
{
Expand All @@ -290,7 +290,7 @@ bar""")`,
},
},
"metadata 8": {
input: `meta().from(1)`,
input: `metadata().from(1)`,
output: `{"baz":"qux","duck,1":"quack","foo":"bar"}`,
messages: []easyMsg{
{},
Expand Down
12 changes: 6 additions & 6 deletions internal/bloblang/parser/query_method_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestMethodParser(t *testing.T) {
},
},
"meta from all": {
input: `meta("foo").from_all()`,
input: `metadata("foo").from_all()`,
output: `["bar",null,"baz"]`,
messages: []easyMsg{
{meta: map[string]any{"foo": "bar"}},
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestMethodParser(t *testing.T) {
},
},
"or boolean from metadata": {
input: `meta("foo").or( meta("bar") == "yep" ).from_all()`,
input: `metadata("foo").or( metadata("bar") == "yep" ).from_all()`,
output: `["from foo",true,false,"from foo 2"]`,
messages: []easyMsg{
{meta: map[string]any{"foo": "from foo"}},
Expand Down Expand Up @@ -331,14 +331,14 @@ func TestMethodParser(t *testing.T) {
messages: []easyMsg{{}},
},
"test format method 2": {
input: `"foo %v bar".format(meta("foo"))`,
input: `"foo %v bar".format(metadata("foo"))`,
output: `foo test bar`,
messages: []easyMsg{{
meta: map[string]any{"foo": "test"},
}},
},
"test format method 3": {
input: `json().("foo %v, %v, %v bar".format(value, meta("foo"), 3))`,
input: `json().("foo %v, %v, %v bar".format(value, metadata("foo"), 3))`,
output: `foo yup, bar, 3 bar`,
messages: []easyMsg{{
content: `{"value":"yup"}`,
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestMethodMaps(t *testing.T) {
}},
},
"map dynamic": {
input: `json().apply(meta("dyn_map"))`,
input: `json().apply(metadata("dyn_map"))`,
output: "this value",
maps: map[string]query.Function{
"foo": query.NewFieldFunction("foo"),
Expand All @@ -530,7 +530,7 @@ func TestMethodMaps(t *testing.T) {
}},
},
"map dynamic 2": {
input: `json().apply(meta("dyn_map"))`,
input: `json().apply(metadata("dyn_map"))`,
output: "and this value",
maps: map[string]query.Function{
"foo": query.NewFieldFunction("foo"),
Expand Down
16 changes: 8 additions & 8 deletions internal/bloblang/parser/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestFunctionParserErrors(t *testing.T) {
err: `line 1 char 16: expected query`,
},
"bad expression 3": {
input: `(json("foo") + meta("bar") `,
input: `(json("foo") + metadata("bar") `,
err: `line 1 char 28: required: expected closing bracket`,
},
"bad method": {
Expand Down Expand Up @@ -141,35 +141,35 @@ func TestFunctionParserLimits(t *testing.T) {
remaining string
}{
"nothing": {
input: `json("foo") + meta("bar")`,
input: `json("foo") + metadata("bar")`,
remaining: ``,
},
"space before": {
input: ` json("foo") + meta("bar")`,
input: ` json("foo") + metadata("bar")`,
remaining: ``,
},
"space before 2": {
input: ` json("foo") + meta("bar")`,
input: ` json("foo") + metadata("bar")`,
remaining: ``,
},
"unfinished comment": {
input: `json("foo") + meta("bar") # Here's a comment`,
input: `json("foo") + metadata("bar") # Here's a comment`,
remaining: ` # Here's a comment`,
},
"extra text": {
input: `json("foo") and this`,
remaining: ` and this`,
},
"extra text 2": {
input: `json("foo") + meta("bar") and this`,
input: `json("foo") + metadata("bar") and this`,
remaining: ` and this`,
},
"extra text 3": {
input: `json("foo")+meta("bar")and this`,
input: `json("foo")+metadata("bar")and this`,
remaining: `and this`,
},
"extra text 4": {
input: `json("foo")+meta("bar") and this`,
input: `json("foo")+metadata("bar") and this`,
remaining: ` and this`,
},
"squiggly bracket": {
Expand Down
12 changes: 6 additions & 6 deletions internal/bloblang/query/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,12 @@ var _ = registerFunction(
"meta",
"Returns the value of a metadata key from the input message as a string, or `null` if the key does not exist. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. In order to query metadata mutations made within a mapping use the <<root_meta, `root_meta` function>>. This function supports extracting metadata from other messages of a batch with the `from` method.",
NewExampleSpec("",
`root.topic = meta("kafka_topic")`,
`root.topic = meta("nope") | meta("also nope") | "default"`,
`root.topic = metadata("kafka_topic")`,
`root.topic = metadata("nope") | metadata("also nope") | "default"`,
),
NewExampleSpec(
"The key parameter is optional and if omitted the entire metadata contents are returned as an object.",
`root.all_metadata = meta()`,
`root.all_metadata = metadata()`,
),
).Param(ParamString("key", "An optional key of a metadata value to obtain.").Default("")),
func(args *ParsedParams) (Function, error) {
Expand Down Expand Up @@ -609,12 +609,12 @@ var _ = registerFunction(
"root_meta",
"Returns the value of a metadata key from the new message being created as a string, or `null` if the key does not exist. Changes made to metadata during a mapping will be reflected by this function.",
NewExampleSpec("",
`root.topic = root_meta("kafka_topic")`,
`root.topic = root_meta("nope") | root_meta("also nope") | "default"`,
Copy link
Collaborator

@mihaitodor mihaitodor Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup! Note that root_meta() wasn't renamed to root_metadata(): https://docs.redpanda.com/redpanda-connect/guides/bloblang/functions/#root_meta. In some cases, replacing meta() with metadata() can break things because metadata() can return any type of object, not just strings. Some of these changes will have to be tested before merging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: I just recalled that I already did this stuff in redpanda-data/connect#2588. I can port it over and clean it up in the next few days so it can be merged.

`root.topic = root_metadata("kafka_topic")`,
`root.topic = root_metadata("nope") | root_metadata("also nope") | "default"`,
),
NewExampleSpec(
"The key parameter is optional and if omitted the entire metadata contents are returned as an object.",
`root.all_metadata = root_meta()`,
`root.all_metadata = root_metadata()`,
),
).Param(ParamString("key", "An optional key of a metadata value to obtain.").Default("")),
func(args *ParsedParams) (Function, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/test/processors_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pipeline:
resource: foocache
operator: set
key: defaultkey
value: ${! meta("foo") }
value: ${! metadata("foo") }
- cache:
resource: foocache
operator: get
Expand Down
4 changes: 2 additions & 2 deletions internal/component/metrics/namespaced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ func TestNamespacedPrefixStaticLabelsWithMappings(t *testing.T) {
func TestNamespacedPrefixStaticLabelsWithMappingLabels(t *testing.T) {
prom, handler := getTestMetrics(t)

mappingFooToBar, err := metrics.NewMapping(`meta = meta().map_each(kv -> kv.value.replace_all("value","bar"))
mappingFooToBar, err := metrics.NewMapping(`meta = metadata().map_each(kv -> kv.value.replace_all("value","bar"))
meta extra1 = "extravalue1"
root = this.replace_all("foo","bar")`, log.Noop())
require.NoError(t, err)

mappingBarToBaz, err := metrics.NewMapping(`meta = meta().map_each(kv -> kv.value.replace_all("bar","baz"))
mappingBarToBaz, err := metrics.NewMapping(`meta = metadata().map_each(kv -> kv.value.replace_all("bar","baz"))
meta extra2 = "extravalue2"
root = this.replace_all("bar","baz")`, log.Noop())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/config/test/docs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pipeline:
output:
aws_s3:
bucket: TODO
path: '${! meta("kafka_topic") }/${! json("message.id") }.json'
path: '${! metadata("kafka_topic") }/${! json("message.id") }.json'
```

One way to write our unit tests for this config is to accompany it with a file of the same name and extension but suffixed with `_benthos_test`, which in this case would be `foo_benthos_test.yaml`.
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/io/input_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ You can access these metadata fields using xref:configuration:interpolation.adoc
service.NewObjectField(hsiFieldResponse,
service.NewInterpolatedStringField(hsiFieldResponseStatus).
Description("Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata.").
Examples(`${! json("status") }`, `${! meta("status") }`).
Examples(`${! json("status") }`, `${! metadata("status") }`).
Default("200"),
service.NewInterpolatedStringMapField(hsiFieldResponseHeaders).
Description("Specify headers to return with synchronous responses.").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/io/input_http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ func TestHTTPSyncResponseHeadersStatus(t *testing.T) {
http_server:
path: /testpost
sync_response:
status: '${! meta("status").or("200") }'
status: '${! metadata("status").or("200") }'
headers:
Content-Type: application/json
foo: '${!json("field1")}'
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/pure/buffer_system_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ A xref:guides:bloblang/about.adoc[Bloblang mapping] applied to each message duri
The timestamp value assigned to `+"`root`"+` must either be a numerical unix time in seconds (with up to nanosecond precision via decimals), or a string in ISO 8601 format. If the mapping fails or provides an invalid result the message will be dropped (with logging to describe the problem).
`).
Default("root = now()").
Example("root = this.created_at").Example(`root = meta("kafka_timestamp_unix").number()`)).
Example("root = this.created_at").Example(`root = metadata("kafka_timestamp_unix").number()`)).
Field(service.NewStringField("size").
Description("A duration string describing the size of each window. By default windows are aligned to the zeroth minute and zeroth hour on the UTC clock, meaning windows of 1 hour duration will match the turn of each hour in the day, this can be adjusted with the `offset` field.").
Example("30s").Example("10m")).
Expand Down Expand Up @@ -110,7 +110,7 @@ pipeline:
root = if batch_index() == 0 {
{
"traffic_light": this.traffic_light,
"created_at": meta("window_end_timestamp"),
"created_at": metadata("window_end_timestamp"),
"total_cars": json("registration_plate").from_all().unique().length(),
"passengers": json("passengers").from_all().sum(),
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/output_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ In order to create a unique `+"`key`"+` value per item you should use function i
Examples(
`${!count("items")}-${!timestamp_unix_nano()}`,
`${!json("doc.id")}`,
`${!meta("kafka_key")}`,
`${!metadata("kafka_key")}`,
).
Default(`${!count("items")}-${!timestamp_unix_nano()}`),
service.NewInterpolatedStringField(coFieldTTL).
Expand Down
Loading
Loading