Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Zed to v1.12.0 #119

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ a data warehouse.

## Installation

To install `zync`, make sure you have Go 1.18 or better installed and then run
To install `zync`, make sure you have Go 1.21 or better installed and then run
```
go install github.com/brimdata/zync/cmd/zync@main
```
Expand Down
8 changes: 4 additions & 4 deletions connectjson/connectjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Encode(val *zed.Value) ([]byte, error) {
Schema *connectSchema `json:"schema"`
Payload interface{} `json:"payload"`
}{
schema, marshalPayload(val.Type, val.Bytes),
schema, marshalPayload(val.Type, val.Bytes()),
})
}

Expand Down Expand Up @@ -87,9 +87,9 @@ func marshalPayload(typ zed.Type, bytes zcode.Bytes) interface{} {
panic("union type unsupported")
case *zed.TypeEnum:
// Trim leading "%".
return zson.MustFormatValue(zed.NewValue(typ, bytes))[1:]
return zson.FormatValue(zed.NewValue(typ, bytes))[1:]
case *zed.TypeError:
return zson.MustFormatValue(zed.NewValue(typ, bytes))
return zson.FormatValue(zed.NewValue(typ, bytes))
default:
panic(fmt.Sprintf("%T unsupported", typ))
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (c *Decoder) decodeBytes(val *zed.Value) *zed.Value {
return val
}
c.builder.Truncate()
err := Walk(val.Type, val.Bytes, func(typ zed.Type, bytes zcode.Bytes) error {
err := Walk(val.Type, val.Bytes(), func(typ zed.Type, bytes zcode.Bytes) error {
if bytes == nil {
c.builder.Append(nil)
} else if zed.IsContainerType(typ) {
Expand Down
7 changes: 2 additions & 5 deletions etl/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ func insertOffsets(ctx context.Context, zctx *zed.Context, doneType zed.Type, ba
if vals[k].Deref("left") != nil {
continue
}
rec, err := zson.FormatValue(&vals[k])
if err != nil {
return nil, err
}
rec := zson.FormatValue(&vals[k])
topic, _, err := getKafkaMeta(&vals[k])
if err != nil {
return nil, err
Expand All @@ -172,7 +169,7 @@ func getKafkaMeta(rec *zed.Value) (string, int64, error) {
// XXX this API should be simplified in zed package
kafkaRec := rec.Deref("kafka")
if kafkaRec == nil {
return "", 0, fmt.Errorf("value missing 'kafka' metadata field: %s", zson.MustFormatValue(rec))
return "", 0, fmt.Errorf("value missing 'kafka' metadata field: %s", zson.FormatValue(rec))
}
topic, err := FieldAsString(kafkaRec, "topic")
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions etl/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func OpenPool(ctx context.Context, poolName string, server lakeapi.Interface) (*
return nil, err
}
// The sync algorithm relies on the pool key being kafka.offset asc.
if pool.Layout.Order != order.Asc || len(pool.Layout.Keys) == 0 || !pool.Layout.Keys[0].Equal(field.Dotted("kafka.offset")) {
if pool.SortKey.Order != order.Asc || len(pool.SortKey.Keys) == 0 || !pool.SortKey.Keys[0].Equal(field.Dotted("kafka.offset")) {
return nil, ErrBadPoolKey
}
return &Pool{
Expand Down Expand Up @@ -93,10 +93,10 @@ func NewArrayFromReader(zr zio.Reader) (*zbuf.Array, error) {
func Field(val *zed.Value, field string) (*zed.Value, error) {
fieldVal := val.Deref(field)
if fieldVal == nil {
return nil, fmt.Errorf("field %q not found in %q", field, zson.MustFormatValue(val))
return nil, fmt.Errorf("field %q not found in %q", field, zson.FormatValue(val))
}
if fieldVal.IsNull() {
return nil, fmt.Errorf("field %q null in %q", field, zson.MustFormatValue(val))
return nil, fmt.Errorf("field %q null in %q", field, zson.FormatValue(val))
}
return fieldVal, nil
}
Expand All @@ -107,7 +107,7 @@ func FieldAsInt(val *zed.Value, field string) (int64, error) {
return 0, err
}
if !zed.IsInteger(fieldVal.Type.ID()) {
return 0, fmt.Errorf("field %q not an interger in %q", field, zson.MustFormatValue(val))
return 0, fmt.Errorf("field %q not an interger in %q", field, zson.FormatValue(val))
}
return fieldVal.AsInt(), nil
}
Expand All @@ -118,7 +118,7 @@ func FieldAsString(val *zed.Value, field string) (string, error) {
return "", err
}
if fieldVal.Type.ID() != zed.IDString {
return "", fmt.Errorf("field %q not a string in %q", field, zson.MustFormatValue(val))
return "", fmt.Errorf("field %q not a string in %q", field, zson.FormatValue(val))
}
return fieldVal.AsString(), nil
}
4 changes: 2 additions & 2 deletions fifo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (c *Consumer) handle(krec *kgo.Record) (*zed.Value, error) {
return nil, err
}
keyType := key.Type
b.Append(key.Bytes)
b.Append(key.Bytes())
val, err := c.decoder.Decode(krec.Value)
if err != nil {
return nil, err
}
b.Append(val.Bytes)
b.Append(val.Bytes())
outerType, err := c.outerType(keyType, val.Type)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion fifo/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewLake(ctx context.Context, poolName, shaper string, server lakeapi.Interf
return nil, err
}
// The sync algorithm relies on the pool key being kafka.offset asc.
if pool.Layout.Order != order.Asc || len(pool.Layout.Keys) == 0 || !pool.Layout.Keys[0].Equal(field.Dotted("kafka.offset")) {
if pool.SortKey.Order != order.Asc || len(pool.SortKey.Keys) == 0 || !pool.SortKey.Keys[0].Equal(field.Dotted("kafka.offset")) {
return nil, ErrBadPoolKey
}
return &Lake{
Expand Down
53 changes: 27 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,65 +1,66 @@
module github.com/brimdata/zync

go 1.19
go 1.21

require (
github.com/brimdata/zed v1.5.0
github.com/brimdata/zed v1.12.0
github.com/buger/jsonparser v1.1.1
github.com/go-avro/avro v0.0.0-20171219232920-444163702c11
github.com/riferrei/srclient v0.4.0
github.com/segmentio/ksuid v1.0.2
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.9.1
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220331035613-01d0c45d69d2
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221214174703-0dfec8e98f4f // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v14 v14.0.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de // indirect
github.com/aws/aws-sdk-go v1.36.17 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.7 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.2.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.49.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/grpc v1.58.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading