diff --git a/README.md b/README.md index 5626712..652f113 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,98 @@ # zinger -`zinger` is a connector between Kafka topics and Zed lakes. -It can run in either direction: syncing a Zed lake to a Kafka topic or -syncing a Kafka topic to a Zed lake. +`zinger` is a connector between [Kafka](https://kafka.apache.org/) and +[Zed lakes](https://github.com/brimdata/zed/tree/main/docs/lake). +It can run in either direction, syncing a Kafka topic to a Zed lake data pool +or vice versa. + +## Installation + +To install `zinger`, clone this repo and run `make install`: +``` +git clone https://github.com/brimdata/zinger.git +make -C zinger install +``` +Make sure you have Go 1.16 or later installed in your environment and +that your shell path includes Go. + +## Quick Start + +For built-in help, run +``` +zinger -h +``` +Make sure your config files are setup for the Kafka cluster +and schema registry (see below), then run some tests. + +List schemas in the registry: +``` +zinger ls +``` +Create a topic called `MyTopic` with one partition using your Kafka admin tools, +then post some data to a topic: +``` +echo '{s:"hello,world"}' | zinger produce -topic MyTopic - +``` +See the record you created: +``` +zinger consume -topic MyTopic +``` +> Hit Ctrl-C to interrupt `zinger consume` as it will wait indefinitely +> for data to arrive on the topic. + +In another shell, run a Zed lake service: +``` +mkdir scratch +zed lake serve -R scratch +``` +Now, sync data from Kafka to a Zed lake: +``` +zapi create -orderby kafka.offset:desc PoolA +zinger sync from -topic MyTopic -pool PoolA +``` +See the data in the Zed pool: +``` +zapi query "from PoolA" +``` +Next, create a topic called `MyTarget` with one partition using your Kafka admin tools, +sync data from a Zed pool back to Kafka, and check that it made it: +``` +zinger sync to -topic MyTarget -pool PoolA +zinger consume -topic MyTarget +``` +Finally, try out shaping. Put a Zed script in `shape.zed`, e.g., +``` +echo 'value:={upper:to_upper(value.s),words:split(value.s, ",")}' > shape.zed +``` +And shape the record from `MyTopic` into a new `PoolB`: +``` +zapi create -orderby kafka.offset:desc PoolB +zinger sync from -topic MyTopic -pool PoolB -shaper shape.zed +zapi query -Z "from PoolB" +``` + +## Configuration + +To configure `zinger` to talk to a Kafka cluster and a schema registry, +you must create two files in `$HOME/.zinger`: +[`kafka.json`](kafka.json) and +[`schema_registry.json`](schema_registry.json). + +This Kafka config file contains the Kafka bootstrap server +addresses and access credentials. + +This schema registry config file contains the URI of the service and +access credentials. + +> We currently support just SASL authentication though it will be easy +> to add other authentication options (or no auth). Please let us know if +> you have a requirement here. + +## Description + +`zinger` has two sub-commands for synchronizing data: +* `zinger sync from` - syncs data from a Kafka topic to a Zed data pool +* `zinger sync to` - syncs data from a Zed data pool to a Kafka topic Currently, only the binary [Kavka/Avro format](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format) @@ -11,36 +101,100 @@ is supported where the Avro schemas are obtained from a configured An arbitrary Zed script can be applied to the Zed records in either direction. -Zinger formats records received from Kafka using the Zed envelope +The Zed pool used by `zinger` must have its pool key set to `kafka.offset` in +descending order. `zinger` will detect and report an error if syncing +is attempted using a pool without this configuration. + +Each Kafka topic must have a single partition as the system relies upon +the offset to indicate the FIFO order of all records. + +### Sync From + +`zinger sync from` formats records received from Kafka using the Zed envelope ``` { - kafka: {topic:string,offset:int64,partition:int32}, - record: {...} + kafka: {topic:string,partition:int64,offset:int64,input_offset:int64}, + key: {...} + value: {...} } ``` -where `...` indicates the Zed record that results from the configured Zed script -applied to the decoded Avro record. -If there is no such script, the record is simply the verbatim result obtained -from decoding Avro into Zed. +where the `key` and `value` fields represent the key/value data pair pulled from +Kafka and transcoded from Avro to Zed. -By including the Kafka offset in the Zed records, `zinger` can query the Zed -lake for the largest offset seen and resume synchronization in a reliable and -consistent fashion. +If a Zed script is provided, it is applied to each such record before +syncing the data to the Zed pool. While the script has access to the +metadata in the `kafka` field, it should not modify these values as this +would cause the synchronization algorithm to fail. -## Installation +After optionally shaping each record with a Zed script, the data is committed +into the Zed data pool in a transactionally consistent fashion where any and +all data committed by zinger writers has monotonically increasing `kafka.offset`. +If multiple writers attempt to commit to records at the same time containing +overlapping offsets, only one will succeed. The others will detect the conflict, +recompute the `kafka.offset`'s accounting for the data provided in the +conflicting commit, and retry the commit. -To install `zinger`, clone this repo and run `make install`: -``` -git clone https://github.com/brimdata/zinger.git -cd zinger -make install -``` -Make sure you have Go installed in your environment and that GOPATH is -in your shell path. +`sync from` records the original input offset in `kafka.input_offset` so when +it comes up, it can query the maximum input offset in the pool and resume +syncing from where it last left off. -## Usage +To avoid the inefficiencies of write-sharing conflicts and retries, +it is best to configure `zinger` with a single writer per pool. -For built-in help, run +> Note: the optimisitic locking and retry algorithm is not yet implemented +> and requires a small change to the Zed lake load endpoint. In the meantime, +> if you run with a single `zinger` writer per pool, this will not be a problem. + +### Sync To + +`zinger sync to` formats data from a Zed data pool as Avro and "produces" +records that arrive in the pool to the Kafka topic specified. + +The synchronization algorithm is very simple: when `sync to` comes up, +it queries the pool for the largest `kafka.offset` present and queries +the Kafka topic for its high-water mark, then it reads, shapes, and +produces all records from the Zed pool at the high-water mark and beyond. + +There is currently no logic to detect multiple concurrent writers, so +care must be taken to only run a single `sync to` process at a time +on any given Zed topic. + +> Currently, `sync to` exits after syncing to the highest offset. +> We plan to soon modify it so it will run continuously, listening for +> commits to the pool, then push any new to Kafka with minimal latency. + +## Debezium Integration + +`zinger` can be used with [Debezium](https://debezium.io) to perform database ETL +and replication by syncing Debezium's CDC logs to a Zed data pool with `sync from`, +shaping the logs for a target database schema, +and replicating the shaped CDC logs to a Kafka database +sink connector using `sync to`. + +Debezium recommends using a single Kakfa topic for database table. +In this same way, we can scale out the Zed lake and `zinger` processes. + +It might be desirable to sync multiple downstream databases with different +schemas to a single upstream database with a unified schema. This can be +accomplished by having `sync from` read from multiple Kafka topics in parallel +(e,g., reading multiple table formats from different downstream databases), +shape each downstream table accordingly, and store the shaped data in the +unified pool. + +A Zed script to shape different schemas to a unified schema is as simple +as a switch statement on the name field of the inbound Kafka topic, e.g., ``` -zinger -h +switch kafka.topic ( + "legacy-oracle-1" => ... ; + "legacy-oracle-2" => ... ; + "legacy-mysql-1" => ... ; + default => ... ; +) ``` + +> Note that `zinger sync from` does not currently support multiplexing multiple +> inbound topics, but support for this is straightforward and we will add it soon. +> +> We also need to adapt `sync from` so it updates the consumer commit offsets, +> allowing aggressive Kafka retention policies to drop data that has been +> safely replicated into the Zed lake. diff --git a/cli/flags.go b/cli/flags.go index 1b6e2f4..e43b926 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -10,8 +10,9 @@ import ( ) type Flags struct { - Topic string - Namespace string + Topic string + Namespace string + ZedLakeHost string } type Credentials struct { @@ -19,9 +20,20 @@ type Credentials struct { Password string } +const HostEnv = "ZED_LAKE_HOST" + +func DefaultHost() string { + host := os.Getenv(HostEnv) + if host == "" { + host = "localhost:9867" + } + return host +} + func (f *Flags) SetFlags(fs *flag.FlagSet) { - fs.StringVar(&f.Topic, "t", "", "Kafka topic name") - fs.StringVar(&f.Namespace, "n", "io.brimdata.zinger", "Kafka name space for new schemas") + fs.StringVar(&f.Topic, "topic", "", "Kafka topic name") + fs.StringVar(&f.Namespace, "namespace", "io.brimdata.zinger", "Kafka name space for new schemas") + fs.StringVar(&f.ZedLakeHost, "host", DefaultHost(), "host[:port] of Zed lake service") } func SchemaRegistryEndpoint() (string, Credentials, error) { @@ -39,12 +51,11 @@ type apiKey struct { } func getKey() (apiKey, error) { - //XXX move this to CLI flags home, err := os.UserHomeDir() if err != nil { return apiKey{}, err } - path := filepath.Join(home, ".confluent", "schema_registry.json") + path := filepath.Join(home, ".zinger", "schema_registry.json") b, err := os.ReadFile(path) if err != nil { return apiKey{}, err @@ -54,7 +65,6 @@ func getKey() (apiKey, error) { return key, err } -//XXX use ccloud code instead? type config struct { BootstrapServers string `json:"bootstrap_servers"` SecurityProtocol string `json:"security_protocol"` @@ -68,7 +78,7 @@ func LoadKafkaConfig() (*kafka.ConfigMap, error) { if err != nil { return nil, err } - path := filepath.Join(home, ".confluent", "kafka_config.json") + path := filepath.Join(home, ".zinger", "kafka.json") b, err := os.ReadFile(path) if err != nil { return nil, err diff --git a/cmd/zinger/consume/command.go b/cmd/zinger/consume/command.go index 7d5dbc4..742f273 100644 --- a/cmd/zinger/consume/command.go +++ b/cmd/zinger/consume/command.go @@ -4,15 +4,20 @@ import ( "context" "errors" "flag" + "fmt" + "math" + "time" "github.com/brimdata/zed/cli/outputflags" "github.com/brimdata/zed/pkg/charm" + "github.com/brimdata/zed/pkg/nano" "github.com/brimdata/zed/pkg/storage" "github.com/brimdata/zed/zson" "github.com/brimdata/zinger/cli" "github.com/brimdata/zinger/cmd/zinger/root" "github.com/brimdata/zinger/fifo" "github.com/riferrei/srclient" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) var Consume = &charm.Spec{ @@ -20,7 +25,20 @@ var Consume = &charm.Spec{ Usage: "consume [options]", Short: "consume data from a Kafka topic and format as Zed output", Long: ` -TBD`, +The consume command reads Avro records from a Kafka topic from the current +position in the provided consumer group. It does not perform "consume commits" +so the consumer group's queue position is not affected. If no consumer group +is given, then a random name is chosen so that reading will begin at offset 0. + +Consume reads each record as Avro and transcodes it to Zed using the configured +schema registry. Any of the output formats used by the "zed" command may be +specified in the same way as in the zed query commands (i.e., zq, zapi query, etc). + +Once consume reaches the head of the Kafka topic, it blocks and waits for more +data and gives up and exits if a timeout is provided. Note that if the duration +is too short, consume may exit before any available records are ready +asynchronously from the topic. +`, New: New, } @@ -30,25 +48,30 @@ func init() { type Command struct { *root.Command - wrap bool flags cli.Flags + timeout string + group string + offset int64 outputFlags outputflags.Flags } -func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { +func New(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) { c := &Command{Command: parent.(*root.Command)} - f.BoolVar(&c.wrap, "k", false, "wrap Kafka meta data around value") - c.flags.SetFlags(f) - c.outputFlags.SetFlags(f) + fs.StringVar(&c.timeout, "timeout", "", "timeout in ZSON duration syntax (5s, 1m30s, ...)") + fs.StringVar(&c.group, "group", "", "Kafka consumer group name") + fs.Int64Var(&c.offset, "offset", 0, "Kafka offset in topic to begin at") + c.flags.SetFlags(fs) + c.outputFlags.SetFlags(fs) return c, nil } func (c *Command) Run(args []string) error { + ctx := context.Background() if len(args) != 0 { return errors.New("extra arguments not allowed") } if c.flags.Topic == "" { - return errors.New("no topic (-t) provided") + return errors.New("no topic provided") } if err := c.outputFlags.Init(); err != nil { return err @@ -63,22 +86,28 @@ func (c *Command) Run(args []string) error { } registry := srclient.CreateSchemaRegistryClient(url) registry.SetCredentials(secret.User, secret.Password) - writer, err := c.outputFlags.Open(context.TODO(), storage.NewLocalEngine()) + writer, err := c.outputFlags.Open(ctx, storage.NewLocalEngine()) if err != nil { return err } - consumer, err := fifo.NewConsumer(config, registry, c.flags.Topic, c.wrap) + zctx := zson.NewContext() + consumer, err := fifo.NewConsumer(zctx, config, registry, c.flags.Topic, c.group, kafka.Offset(0), false) if err != nil { return err } - err = consumer.Run(zson.NewContext(), writer) + timeout := nano.Duration(math.MaxInt64) + if c.timeout != "" { + timeout, err = nano.ParseDuration(c.timeout) + if err != nil { + return fmt.Errorf("parse error with -timeout option: %w", err) + } + } + // Note that we do not close the Kafka consumer here as that involves + // an extra timeout and since we are notting committing consumer offsets, + // there is no need to shutdown in this fashion. + err = consumer.Run(ctx, writer, time.Duration(timeout)) if closeErr := writer.Close(); err == nil { err = closeErr } - // XXX skip close for now since we are not committing any new offsets - // and there is a long timeout in the close path... - //if err == nil { - // consumer.Close() - //} return err } diff --git a/cmd/zinger/info/command.go b/cmd/zinger/info/command.go new file mode 100644 index 0000000..5473520 --- /dev/null +++ b/cmd/zinger/info/command.go @@ -0,0 +1,67 @@ +package ls + +import ( + "flag" + "fmt" + + "github.com/brimdata/zed/pkg/charm" + "github.com/brimdata/zed/zson" + "github.com/brimdata/zinger/cli" + "github.com/brimdata/zinger/cmd/zinger/root" + "github.com/brimdata/zinger/fifo" + "github.com/riferrei/srclient" +) + +var Info = &charm.Spec{ + Name: "info", + Usage: "info -topic topic", + Short: "show info about a topic", + Long: ` +The info command displays information about a Kafka topic. +Currently it simply prints the low and high water marks for the +indicated consumer group, or the abslute low and high water marks +if no group is given. +`, + New: New, +} + +func init() { + root.Zinger.Add(Info) +} + +type Command struct { + *root.Command + group string + flags cli.Flags +} + +func New(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) { + c := &Command{Command: parent.(*root.Command)} + fs.StringVar(&c.group, "group", "", "Kafka consumer group name") + c.flags.SetFlags(fs) + return c, nil +} + +func (c *Command) Run(args []string) error { + url, secret, err := cli.SchemaRegistryEndpoint() + if err != nil { + return err + } + config, err := cli.LoadKafkaConfig() + if err != nil { + return err + } + registry := srclient.CreateSchemaRegistryClient(url) + registry.SetCredentials(secret.User, secret.Password) + zctx := zson.NewContext() + consumer, err := fifo.NewConsumer(zctx, config, registry, c.flags.Topic, c.group, 0, false) + if err != nil { + return err + } + low, high, err := consumer.Watermarks() + if err != nil { + return err + } + fmt.Printf("low %d high %d\n", low, high) + return nil +} diff --git a/cmd/zinger/ls/command.go b/cmd/zinger/ls/command.go index bb4cd72..d2d6255 100644 --- a/cmd/zinger/ls/command.go +++ b/cmd/zinger/ls/command.go @@ -18,6 +18,8 @@ var Ls = &charm.Spec{ The ls command prints the schema registry subjects and schema information for the latest schema in each subject. The endpoint URL and credentials are obtained from $HOME/.confluent/schema_registry.json. + +This runs a bit slow as each schema entry is fetched synchronously from the registry. `, New: New, } diff --git a/cmd/zinger/main.go b/cmd/zinger/main.go index 60f501c..fb32fff 100644 --- a/cmd/zinger/main.go +++ b/cmd/zinger/main.go @@ -5,6 +5,7 @@ import ( "os" _ "github.com/brimdata/zinger/cmd/zinger/consume" + _ "github.com/brimdata/zinger/cmd/zinger/info" _ "github.com/brimdata/zinger/cmd/zinger/ls" _ "github.com/brimdata/zinger/cmd/zinger/produce" "github.com/brimdata/zinger/cmd/zinger/root" diff --git a/cmd/zinger/produce/command.go b/cmd/zinger/produce/command.go index 244bb6d..4923835 100644 --- a/cmd/zinger/produce/command.go +++ b/cmd/zinger/produce/command.go @@ -1,6 +1,7 @@ package produce import ( + "context" "errors" "flag" @@ -21,7 +22,7 @@ var Produce = &charm.Spec{ Short: "produce Zed data into a Kafka topic", Long: ` The produce command copies the input Zed data into a Kafka topic. -No effort is made to provide synchronization as data as simply coped from +No effort is made to provide synchronization as data as simply copied from input to the topic and any failures are not recovered from. Use the "zinger sync" command to provide synchronization and fail-safe, restartable operation.`, @@ -46,11 +47,12 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { } func (c *Command) Run(args []string) error { + ctx := context.Background() if len(args) == 0 { return errors.New("no inputs provided") } if c.flags.Topic == "" { - return errors.New("no topic (-t) provided") + return errors.New("no topic provided") } if err := c.inputFlags.Init(); err != nil { return err @@ -65,7 +67,7 @@ func (c *Command) Run(args []string) error { } registry := srclient.CreateSchemaRegistryClient(url) registry.SetCredentials(secret.User, secret.Password) - readers, err := c.inputFlags.Open(zson.NewContext(), storage.NewLocalEngine(), args, true) + readers, err := c.inputFlags.Open(ctx, zson.NewContext(), storage.NewLocalEngine(), args, true) if err != nil { return err } @@ -74,5 +76,5 @@ func (c *Command) Run(args []string) error { if err != nil { return err } - return producer.Run(zio.ConcatReader(readers...)) + return producer.Run(ctx, zio.ConcatReader(readers...)) } diff --git a/cmd/zinger/root/command.go b/cmd/zinger/root/command.go index 3718dd3..6c6560d 100644 --- a/cmd/zinger/root/command.go +++ b/cmd/zinger/root/command.go @@ -15,11 +15,27 @@ var ( var Zinger = &charm.Spec{ Name: "zinger", - Usage: "zinger [global options] command [options] [arguments...]", - Short: "use zinger to receive, store, and transform zeek logs", + Usage: "zinger command [options] [arguments...]", + Short: "synchronize a Zed lake with Kafka", Long: ` -Zinger interconnects zeek and Kafka/Acvro using the Kafka Schema Registery. +Zinger interconnects a Zed lake with Kafka/Avro using the Kafka Schema Registery. +Syncronization can flow in either direction with the "sync from" or "sync to" +subcommands. Zinger also includes some simple support for consuming and +producing data on Kafka topic as Avro to/from the Zed formats. See the +"consume" and "produce" subcommands for more details. +Zinger requires a schema registry and is configured using two files +in ~/.zinger. + +Currently, zinger supports only SASL authentication but it will be +easy to add support for any other schemes supported by your Kafka +installation. + +The kafka.json file should have the form +of http://github.com/brimdata/zinger/kafka.json. + +The schema_registry.json file should have the form +of http://github.com/brimdata/zinger/schema_registry.json. `, New: New, } diff --git a/cmd/zinger/sync/command.go b/cmd/zinger/sync/command.go index ae59b71..eb637e3 100644 --- a/cmd/zinger/sync/command.go +++ b/cmd/zinger/sync/command.go @@ -3,12 +3,13 @@ package sync import ( "errors" "flag" + "os" "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zinger/cmd/zinger/root" ) -var Sync = &charm.Spec{ +var SyncSpec = &charm.Spec{ Name: "sync", Usage: "sync [options]", Short: "sync from a Zed lake pool to a Kafka topic", @@ -18,24 +19,39 @@ as a source of Zed data for Kafka. The Zed records are transcoded into Avro and published on the specified Kafka topic. -XXX document technique and expected format of lake records. +See http://github.com/brimdata/zinger/README.md for a description +of the sync algorithm and the layout of the Zed records in the Zed data pool. `, - New: New, + New: NewSync, } func init() { - root.Zinger.Add(Sync) + SyncSpec.Add(FromSpec) + SyncSpec.Add(ToSpec) + root.Zinger.Add(SyncSpec) } -type Command struct { +type Sync struct { *root.Command + pool string + shaper string } -func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{Command: parent.(*root.Command)} +func NewSync(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) { + c := &Sync{Command: parent.(*root.Command)} + fs.StringVar(&c.pool, "pool", "", "name of Zed data pool") + fs.StringVar(&c.shaper, "shaper", "", "path of optional Zed script for shaping") return c, nil } -func (c *Command) Run(args []string) error { +func (s *Sync) Run(args []string) error { return errors.New("TBD") } + +func (s *Sync) loadShaper() (string, error) { + if s.shaper == "" { + return "", nil + } + b, err := os.ReadFile(s.shaper) + return string(b), err +} diff --git a/cmd/zinger/sync/from.go b/cmd/zinger/sync/from.go new file mode 100644 index 0000000..ab8c5f0 --- /dev/null +++ b/cmd/zinger/sync/from.go @@ -0,0 +1,103 @@ +package sync + +import ( + "context" + "errors" + "flag" + "fmt" + + lakeapi "github.com/brimdata/zed/lake/api" + "github.com/brimdata/zed/pkg/charm" + "github.com/brimdata/zed/zson" + "github.com/brimdata/zinger/cli" + "github.com/brimdata/zinger/fifo" + "github.com/riferrei/srclient" +) + +var FromSpec = &charm.Spec{ + Name: "from", + Usage: "from [options]", + Short: "sync a Kafka topic to a Zed lake pool", + Long: ` +The "from" command syncs data on a Kafka topic to a Zed lake pool. +The Zed records are transcoded from Avro into Zed and synced +to the main branch of the Zed data pool specified. + +The data pool's key must be "kafka.offset" sorted in descending order. + +See https://github.com/brimdata/zinger/README.md for a description +of how this works. + +`, + New: NewFrom, +} + +type From struct { + *Sync + group string + flags cli.Flags +} + +func NewFrom(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) { + f := &From{Sync: parent.(*Sync)} + fs.StringVar(&f.group, "group", "", "Kafka consumer group name") + f.flags.SetFlags(fs) + return f, nil +} + +func (f *From) Run(args []string) error { + if f.flags.Topic == "" { + return errors.New("no topic provided") + } + if f.pool == "" { + return errors.New("no pool provided") + + } + shaper, err := f.loadShaper() + if err != nil { + return err + } + ctx := context.Background() + service, err := lakeapi.OpenRemoteLake(ctx, f.flags.ZedLakeHost) + if err != nil { + return err + } + lk, err := fifo.NewLake(ctx, f.pool, "", service) + if err != nil { + return err + } + consumerOffset, err := lk.NextConsumerOffset(f.flags.Topic) + if err != nil { + return err + } + url, secret, err := cli.SchemaRegistryEndpoint() + if err != nil { + return err + } + config, err := cli.LoadKafkaConfig() + if err != nil { + return err + } + registry := srclient.CreateSchemaRegistryClient(url) + registry.SetCredentials(secret.User, secret.Password) + zctx := zson.NewContext() + consumer, err := fifo.NewConsumer(zctx, config, registry, f.flags.Topic, f.group, consumerOffset, true) + if err != nil { + return err + } + from := fifo.NewFrom(zctx, lk, consumer, shaper) + ncommit, nrec, err := from.Sync(ctx) + if ncommit != 0 { + fmt.Printf("synchronized %d record%s in %d commit%s\n", nrec, plural(nrec), ncommit, plural(ncommit)) + } else { + fmt.Println("nothing new found to synchronize") + } + return err +} + +func plural(n int64) string { + if n == 1 { + return "" + } + return "s" +} diff --git a/cmd/zinger/sync/to.go b/cmd/zinger/sync/to.go new file mode 100644 index 0000000..9fcae4e --- /dev/null +++ b/cmd/zinger/sync/to.go @@ -0,0 +1,85 @@ +package sync + +import ( + "context" + "errors" + "flag" + + lakeapi "github.com/brimdata/zed/lake/api" + "github.com/brimdata/zed/pkg/charm" + "github.com/brimdata/zed/zson" + "github.com/brimdata/zinger/cli" + "github.com/brimdata/zinger/fifo" + "github.com/riferrei/srclient" +) + +var ToSpec = &charm.Spec{ + Name: "to", + Usage: "to [options]", + Short: "sync a Zed lake pool to a Kafka topic", + Long: ` +The "to" command syncs data from a Zed lake to a Kafka topic acting +as a source of Zed data for Kafka. +The Zed records are transcoded from Zed to Avro and synced +to the target Kafka topic. +The data pool is expected to have the pool key "kafka.offset" sorted +in descending order. + +Only a single writer is allowed at any given time to the Kafka topic. +At start up, the to command queries the topic for its high-water mark +and continues replicating data from the source data pool at that offset +according to the kafka.offset value in the data pool. +`, + New: NewTo, +} + +type To struct { + *Sync + flags cli.Flags +} + +func NewTo(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) { + f := &To{Sync: parent.(*Sync)} + f.flags.SetFlags(fs) + return f, nil +} + +func (t *To) Run(args []string) error { + if t.flags.Topic == "" { + return errors.New("no topic provided") + } + if t.pool == "" { + return errors.New("no pool provided") + + } + shaper, err := t.loadShaper() + if err != nil { + return err + } + ctx := context.Background() + service, err := lakeapi.OpenRemoteLake(ctx, t.flags.ZedLakeHost) + if err != nil { + return err + } + lk, err := fifo.NewLake(ctx, t.pool, shaper, service) + if err != nil { + return err + } + url, secret, err := cli.SchemaRegistryEndpoint() + if err != nil { + return err + } + config, err := cli.LoadKafkaConfig() + if err != nil { + return err + } + registry := srclient.CreateSchemaRegistryClient(url) + registry.SetCredentials(secret.User, secret.Password) + zctx := zson.NewContext() + producer, err := fifo.NewProducer(config, registry, t.flags.Topic, t.flags.Namespace) + if err != nil { + return err + } + to := fifo.NewTo(zctx, producer, lk) + return to.Sync(ctx) +} diff --git a/fifo/consumer.go b/fifo/consumer.go index af58dea..715ec81 100644 --- a/fifo/consumer.go +++ b/fifo/consumer.go @@ -1,11 +1,12 @@ package fifo import ( + "context" "encoding/binary" - "errors" "fmt" "time" + "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zcode" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zng" @@ -18,20 +19,40 @@ import ( ) type Consumer struct { - consumer *kafka.Consumer - registry *srclient.SchemaRegistryClient - highWater kafka.Offset - wrap bool - types map[zng.Type]zng.Type + zctx *zson.Context + consumer *kafka.Consumer + registry *srclient.SchemaRegistryClient + topic string + metaType zng.Type + types map[zng.Type]map[zng.Type]zng.Type + schemas map[int]typeSchema } -func NewConsumer(config *kafka.ConfigMap, reg *srclient.SchemaRegistryClient, topic string, wrap bool) (*Consumer, error) { - if err := config.SetKey("group.id", ksuid.New().String()); err != nil { +type typeSchema struct { + zng.Type + avro.Schema +} + +func NewConsumer(zctx *zson.Context, config *kafka.ConfigMap, reg *srclient.SchemaRegistryClient, topic, group string, startAt kafka.Offset, meta bool) (*Consumer, error) { + var metaType zng.Type + if meta { + var err error + metaType, err = zson.ParseType(zctx, "{topic:string,partition:int64,offset:int64}") + if err != nil { + return nil, err + } + } + if group == "" { + group = ksuid.New().String() + } + if err := config.SetKey("group.id", group); err != nil { return nil, err } - if err := config.SetKey("auto.offset.reset", "earliest"); err != nil { + if err := config.SetKey("go.events.channel.enable", true); err != nil { return nil, err } + // Note that we do not conifgure "auto.offset.reset" since we assign + // the topic offset explicitly instead of doing a dynamic subscribe. if err := config.SetKey("enable.auto.commit", false); err != nil { return nil, err } @@ -39,20 +60,21 @@ func NewConsumer(config *kafka.ConfigMap, reg *srclient.SchemaRegistryClient, to if err != nil { return nil, err } - c.SubscribeTopics([]string{topic}, nil) - // Get the last offset at start time and read up to that point. - // XXX we should add an option to stream or stop at the end, - // and should check that there is only one partition - _, offset, err := c.QueryWatermarkOffsets(topic, 0, 5*1000) - if err != nil { + partition := kafka.TopicPartition{ + Topic: &topic, + Offset: kafka.Offset(startAt), + } + if err := c.Assign([]kafka.TopicPartition{partition}); err != nil { return nil, err } return &Consumer{ - consumer: c, - registry: reg, - highWater: kafka.Offset(offset), - wrap: wrap, - types: make(map[zng.Type]zng.Type), + zctx: zctx, + consumer: c, + registry: reg, + topic: topic, + metaType: metaType, + types: make(map[zng.Type]map[zng.Type]zng.Type), + schemas: make(map[int]typeSchema), }, nil } @@ -60,87 +82,181 @@ func (c *Consumer) Close() { c.consumer.Close() } -func (c *Consumer) Run(zctx *zson.Context, w zio.Writer) error { - metaType, err := zson.ParseType(zctx, "{topic:string,partition:int64,offset:int64}") - if err != nil { - return err - } +type Flusher interface { + Flush() error +} + +func (c *Consumer) Run(ctx context.Context, w zio.Writer, timeout time.Duration) error { + events := c.consumer.Events() for { - msg, err := c.consumer.ReadMessage(time.Second) - if err != nil { - //XXX - fmt.Printf("Error consuming the message: %v (%v)\n", err, msg) - } else { - if len(msg.Value) < 6 { - return fmt.Errorf("bad kafka-avro value in topic: len %d", len(msg.Value)) - } - schemaID := binary.BigEndian.Uint32(msg.Value[1:5]) - schema, err := c.registry.GetSchema(int(schemaID)) - if err != nil { - return fmt.Errorf("could not retrieve schema id %d: %w", schemaID, err) + select { + case ev := <-events: + if ev == nil { + // channel closed + return nil } - avroSchema, err := avro.ParseSchema(schema.Schema()) + rec, err := c.handle(ev) if err != nil { return err } - typ, err := zavro.DecodeSchema(zctx, avroSchema) - if err != nil { - return err + if rec == nil { + // unknown event + continue } - recType := zng.TypeRecordOf(typ) - if recType == nil { - return errors.New("avro schema not a Zed record") + if err := w.Write(rec); err != nil { + return err } - avroTypeRecord, ok := avroSchema.(*avro.RecordSchema) - if !ok { - return errors.New("schema not an avrod record") + case <-ctx.Done(): + return ctx.Err() + case <-time.After(timeout): + return nil + } + } +} + +func (c *Consumer) Read(ctx context.Context, thresh int, timeout time.Duration) (zbuf.Array, error) { + var batch zbuf.Array + var size int + events := c.consumer.Events() + for { + select { + case ev := <-events: + if ev == nil { + // channel closed + return batch, nil } - bytes, err := zavro.Decode(msg.Value[5:], avroTypeRecord) + rec, err := c.handle(ev) if err != nil { - return err + return nil, err } - var rec *zng.Record - if c.wrap { - rec, err = c.wrapRecord(zctx, metaType, recType, bytes, msg.TopicPartition) - if err != nil { - return err - } - } else { - rec = zng.NewRecord(recType, bytes) + if rec == nil { + // unknown event + continue } - if err := w.Write(rec); err != nil { - return err - } - if (msg.TopicPartition.Offset + 1) >= c.highWater { - break + batch.Append(rec) + size += len(rec.Bytes) + if size > thresh { + return batch, nil } + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(timeout): + return batch, nil } } - return nil } -func (c *Consumer) wrapRecord(zctx *zson.Context, metaType, typ zng.Type, bytes []byte, meta kafka.TopicPartition) (*zng.Record, error) { - outerType, ok := c.types[typ] - if !ok { - cols := []zng.Column{ - {"kafka", metaType}, - //{"key", keyType}, XXX not yet - {"value", typ}, +func (c *Consumer) handle(ev kafka.Event) (*zng.Record, error) { + switch ev := ev.(type) { + case kafka.Error: + return nil, ev + case *kafka.Message: + key, err := c.decodeAvro(ev.Key) + if err != nil { + return nil, err } - var err error - outerType, err = zctx.LookupTypeRecord(cols) + val, err := c.decodeAvro(ev.Value) if err != nil { return nil, err } - c.types[typ] = outerType + return c.wrapRecord(key, val, ev.TopicPartition) + default: + return nil, nil + } +} + +func (c *Consumer) wrapRecord(key, val zng.Value, meta kafka.TopicPartition) (*zng.Record, error) { + outerType, err := c.outerType(key.Type, val.Type) + if err != nil { + return nil, err } var b zcode.Builder - // {topic:string,partition:int64,offset:int64} - b.BeginContainer() - b.AppendPrimitive([]byte(*meta.Topic)) - b.AppendPrimitive(zng.EncodeInt(int64(meta.Partition))) - b.AppendPrimitive(zng.EncodeInt(int64(meta.Offset))) - b.EndContainer() - b.AppendContainer(bytes) + if c.metaType != nil { + // kafka:{topic:string,partition:int64,offset:int64} + b.BeginContainer() + b.AppendPrimitive([]byte(*meta.Topic)) + b.AppendPrimitive(zng.EncodeInt(int64(meta.Partition))) + b.AppendPrimitive(zng.EncodeInt(int64(meta.Offset))) + b.EndContainer() + } + b.AppendContainer(key.Bytes) + b.AppendContainer(val.Bytes) return zng.NewRecord(outerType, b.Bytes()), nil } + +func (c *Consumer) decodeAvro(b []byte) (zng.Value, error) { + if len(b) == 0 { + return zng.Value{Type: zng.TypeNull}, nil + } + if len(b) < 5 { + return zng.Value{}, fmt.Errorf("Kafka-Avro header is too short: len %d", len(b)) + } + schemaID := binary.BigEndian.Uint32(b[1:5]) + schema, typ, err := c.getSchema(int(schemaID)) + if err != nil { + return zng.Value{}, fmt.Errorf("could not retrieve schema id %d: %w", schemaID, err) + } + bytes, err := zavro.Decode(b[5:], schema) + if err != nil { + return zng.Value{}, err + } + return zng.Value{typ, bytes}, nil +} + +func (c *Consumer) getSchema(id int) (avro.Schema, zng.Type, error) { + if both, ok := c.schemas[id]; ok { + return both.Schema, both.Type, nil + } + schema, err := c.registry.GetSchema(id) + if err != nil { + return nil, nil, fmt.Errorf("could not retrieve schema id %d: %w", id, err) + } + avroSchema, err := avro.ParseSchema(schema.Schema()) + if err != nil { + return nil, nil, err + } + typ, err := zavro.DecodeSchema(c.zctx, avroSchema) + if err != nil { + return nil, nil, err + } + c.schemas[id] = typeSchema{Type: typ, Schema: avroSchema} + return avroSchema, typ, nil +} + +func (c *Consumer) outerType(key, val zng.Type) (zng.Type, error) { + m, ok := c.types[key] + if !ok { + c.makeType(key, val) + } else if typ, ok := m[val]; ok { + return typ, nil + } else { + c.makeType(key, val) + } + return c.types[key][val], nil +} + +func (c *Consumer) makeType(key, val zng.Type) (*zng.TypeRecord, error) { + cols := []zng.Column{ + {"kafka", c.metaType}, + {"key", key}, + {"value", val}, + } + if c.metaType == nil { + cols = cols[1:] + } + typ, err := c.zctx.LookupTypeRecord(cols) + if err != nil { + return nil, err + } + m, ok := c.types[key] + if !ok { + m = make(map[zng.Type]zng.Type) + c.types[key] = m + } + m[val] = typ + return typ, nil +} + +func (c *Consumer) Watermarks() (int64, int64, error) { + return c.consumer.QueryWatermarkOffsets(c.topic, 0, 5*1000) +} diff --git a/fifo/from.go b/fifo/from.go new file mode 100644 index 0000000..b8c42b2 --- /dev/null +++ b/fifo/from.go @@ -0,0 +1,115 @@ +package fifo + +import ( + "context" + "fmt" + "time" + + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zng" + "github.com/brimdata/zed/zson" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + +// From syncs data from a Kafka topic to a Zed lake in a +// consistent and crash-recoverable fashion. The data synced to the lake +// is assigned a target offset in the lake that may be used to then sync +// the merged lake's data back to another Kafka topic using To. +type From struct { + zctx *zson.Context + dst *Lake + src *Consumer + shaper string + batch zbuf.Batch +} + +func NewFrom(zctx *zson.Context, dst *Lake, src *Consumer, shaper string) *From { + return &From{ + zctx: zctx, + dst: dst, + src: src, + shaper: shaper, + } +} + +// These should be configurable. See issue #18. +const BatchThresh = 10 * 1024 * 1024 +const BatchTimeout = 5 * time.Second + +func (f *From) Sync(ctx context.Context) (int64, int64, error) { + offset, err := f.dst.NextProducerOffset() + if err != nil { + return 0, 0, err + } + // Loop over the records from the Kafka consumer and + // commit a batch at a time to the lake. + var ncommit, nrec int64 + for { + batch, err := f.src.Read(ctx, BatchThresh, BatchTimeout) + if err != nil { + return 0, 0, err + } + batchLen := batch.Length() + if batchLen == 0 { + break + } + batch, err = AdjustOffsetsAndShape(f.zctx, batch, offset, f.shaper) + if err != nil { + return 0, 0, err + } + //XXX We need to track the commitID and use new commit-only-if + // constraint and recompute offsets if needed. See zinger issue #16. + commit, err := f.dst.LoadBatch(batch) + if err != nil { + return 0, 0, err + } + fmt.Printf("commit %s %d record%s\n", commit, batchLen, plural(batchLen)) + offset += kafka.Offset(batchLen) + nrec += int64(batchLen) + ncommit++ + } + return ncommit, nrec, nil +} + +// AdjustOffsetsAndShape runs a local Zed program to adjust the Kafka offset fields +// for insertion into correct position in the lake and remember the original +// offset along with applying a user-defined shaper. +func AdjustOffsetsAndShape(zctx *zson.Context, batch zbuf.Array, offset kafka.Offset, shaper string) (zbuf.Array, error) { + rec := batch.Index(0) + kafkaRec, err := batch.Index(0).Access("kafka") + if err != nil { + s, err := zson.FormatValue(rec.Value) + if err != nil { + // This should not happen. + err = fmt.Errorf("[ERR! %w]", err) + } + // This shouldn't happen since the consumer automatically adds + // this field. + return nil, fmt.Errorf("value read from Kafka topic missing 'kafka' metadata field: %s", s) + } + // XXX this should be simplified in zed package + first, err := zng.NewRecord(kafkaRec.Type, kafkaRec.Bytes).AccessInt("offset") + if err != nil { + s, err := zson.FormatValue(kafkaRec) + if err != nil { + // This should not happen. + err = fmt.Errorf("[ERR! %w]", err) + } + return nil, fmt.Errorf("'kafka' metadata field is missing 'offset' field: %s", s) + } + // Send the batch of Zed records through this query to adjust the save + // the original input offset and adjust the offset so it fits in sequetentially + // we everything else in the target pool. + query := fmt.Sprintf("kafka.input_offset:=kafka.offset,kafka.offset:=kafka.offset-%d+%d", first, offset) + if shaper != "" { + query = fmt.Sprintf("%s | %s", query, shaper) + } + return RunLocalQuery(zctx, batch, query) +} + +func plural(n int) string { + if n == 1 { + return "" + } + return "s" +} diff --git a/fifo/lake.go b/fifo/lake.go new file mode 100644 index 0000000..479aa52 --- /dev/null +++ b/fifo/lake.go @@ -0,0 +1,149 @@ +package fifo + +import ( + "context" + "errors" + "fmt" + + "github.com/brimdata/zed/api" + "github.com/brimdata/zed/compiler" + "github.com/brimdata/zed/driver" + "github.com/brimdata/zed/field" + lakeapi "github.com/brimdata/zed/lake/api" + "github.com/brimdata/zed/order" + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zson" + "github.com/segmentio/ksuid" + "go.uber.org/zap" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + +var ErrBadPoolKey = errors.New("pool key must be 'kafka.offset' in descending order") + +type Lake struct { + service *lakeapi.RemoteSession + shaper string + pool string + poolID ksuid.KSUID +} + +func NewLake(ctx context.Context, poolName, shaper string, server *lakeapi.RemoteSession) (*Lake, error) { + pool, err := lakeapi.LookupPoolByName(ctx, server, poolName) + if err != nil { + return nil, err + } + // The sync algorithm relies on the pool key being kafka.offset desc. + if pool.Layout.Order != order.Desc || len(pool.Layout.Keys) == 0 || !pool.Layout.Keys[0].Equal(field.Dotted("kafka.offset")) { + return nil, ErrBadPoolKey + } + return &Lake{ + pool: poolName, + poolID: pool.ID, + service: server, + shaper: shaper, + }, nil +} + +func (l *Lake) Query(q string) (zbuf.Array, error) { + query := fmt.Sprintf("from '%s' | %s", l.pool, q) + //XXX We need to make this API easier in package zed... + result := &batchDriver{} + _, err := l.service.Query(context.TODO(), result, nil, query) + if err != nil { + return nil, err + } + return result.Array, nil +} + +func (l *Lake) LoadBatch(batch zbuf.Array) (ksuid.KSUID, error) { + return l.service.Load(context.TODO(), l.poolID, "main", &batch, api.CommitMessage{}) +} + +func (l *Lake) NextProducerOffset() (kafka.Offset, error) { + // Run a query against the pool to get the max output offset. + // We assume the pool-key is kafka.offset so we just run a head 1. + batch, err := l.Query("head 1 | offset:=kafka.offset") + if err != nil { + return 0, err + } + n := batch.Length() + if n == 0 { + return 0, nil + } + if n != 1 { + // This should not happen. + return 0, errors.New("'head 1' returned more than one record") + } + offset, err := batch.Index(0).AccessInt("offset") + if err != nil { + return 0, err + } + return kafka.Offset(offset + 1), nil +} + +func (l *Lake) NextConsumerOffset(topic string) (kafka.Offset, error) { + // Find the largest input_offset for the given topic. Since these + // values are monotonically increasing, we can just do head 1. + query := fmt.Sprintf("kafka.topic=='%s' | head 1 | offset:=kafka.input_offset", topic) + batch, err := l.Query(query) + if err != nil { + return 0, err + } + n := batch.Length() + if n == 0 { + return 0, nil + } + if n != 1 { + // This should not happen. + return 0, errors.New("'head 1' returned more than one record") + } + offset, err := batch.Index(0).AccessInt("offset") + if err != nil { + return 0, err + } + return kafka.Offset(offset + 1), nil +} + +func (l *Lake) ReadBatch(ctx context.Context, offset kafka.Offset, size int) (zbuf.Batch, error) { + query := fmt.Sprintf("kafka.offset >= %d | head %d", offset, size) + if l.shaper != "" { + query = fmt.Sprintf("%s | %s | sort kafka.offset", query, l.shaper) + } else { + query += "| sort kafka.offset" + } + return l.Query(query) +} + +func RunLocalQuery(zctx *zson.Context, batch zbuf.Array, query string) (zbuf.Array, error) { + //XXX We need to make this API easier in package zed... + program, err := compiler.ParseProc(query) + if err != nil { + return nil, err + } + var result zbuf.Array + if err := driver.Copy(context.TODO(), &result, program, zctx, &batch, zap.NewNop()); err != nil { + return nil, err + } + return result, nil +} + +type batchDriver struct { + zbuf.Array +} + +func (b *batchDriver) Write(cid int, batch zbuf.Batch) error { + if cid != 0 { + return errors.New("internal error: multiple tails not allowed") + } + for i := 0; i < batch.Length(); i++ { + rec := batch.Index(i) + rec.Keep() + b.Append(rec) + } + batch.Unref() + return nil +} + +func (*batchDriver) Warn(warning string) error { return nil } +func (*batchDriver) Stats(stats api.ScannerStats) error { return nil } +func (*batchDriver) ChannelEnd(cid int) error { return nil } diff --git a/fifo/producer.go b/fifo/producer.go index 279b6bd..42b65cb 100644 --- a/fifo/producer.go +++ b/fifo/producer.go @@ -1,14 +1,14 @@ package fifo import ( + "context" "encoding/json" - "errors" "fmt" + "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zng" "github.com/brimdata/zinger/zavro" - "github.com/go-avro/avro" "github.com/riferrei/srclient" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) @@ -22,6 +22,9 @@ type Producer struct { } func NewProducer(config *kafka.ConfigMap, reg *srclient.SchemaRegistryClient, topic, namespace string) (*Producer, error) { + if err := config.SetKey("enable.idempotence", true); err != nil { + return nil, err + } p, err := kafka.NewProducer(config) if err != nil { return nil, err @@ -35,77 +38,169 @@ func NewProducer(config *kafka.ConfigMap, reg *srclient.SchemaRegistryClient, to }, nil } -func (p *Producer) Run(reader zio.Reader) error { +func (p *Producer) HeadOffset() (kafka.Offset, error) { + _, high, err := p.producer.QueryWatermarkOffsets(p.topic, 0, 1000*10) + return kafka.Offset(high), err +} + +func (p *Producer) Run(ctx context.Context, reader zio.Reader) error { + fmt.Printf("producing messages to topic %q...\n", p.topic) + ctx, cancel := context.WithCancel(ctx) + done := make(chan error) go func() { - //XXX need to handle errors gracefully from this goroutine - for e := range p.producer.Events() { - switch ev := e.(type) { - case *kafka.Message: - if ev.TopicPartition.Error != nil { - fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) - } else { - //fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n", - // *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) + defer close(done) + for { + select { + case ev := <-p.producer.Events(): + msg, ok := ev.(*kafka.Message) + if !ok { + continue } + if msg.TopicPartition.Error != nil { + done <- msg.TopicPartition.Error + return + } + case <-ctx.Done(): + return } } }() var n int var closeErr error +loop: for { - rec, err := reader.Read() - if rec == nil || err != nil { + select { + case err := <-done: closeErr = err - break + break loop + default: + rec, err := reader.Read() + if rec == nil || err != nil { + closeErr = err + break loop + } + if err := p.write(rec); err != nil { + closeErr = err + break loop + } + n++ } - if err := p.write(rec); err != nil { - closeErr = err + } + fmt.Println("waiting for Kafka flush...") + for { + nleft := p.producer.Flush(1000) + if nleft == 0 { break } - n++ + fmt.Printf("waiting for %d Kafka events...\n", nleft) } - // Wait for all messages to be delivered XXX - p.producer.Flush(5 * 1000) + cancel() fmt.Printf("%d messages produced to topic %q\n", n, p.topic) p.producer.Close() return closeErr } -func (p *Producer) write(rec *zng.Record) error { - id, ok := p.mapper[rec.Type] - if !ok { - s, err := zavro.EncodeSchema(zng.TypeRecordOf(rec.Type), p.namespace) - if err != nil { - return err - } - record, ok := s.(*avro.RecordSchema) - if !ok { - return errors.New("internal error: avro schema not of type record") - } - schema, err := json.Marshal(record) - if err != nil { - return err +func (p *Producer) Send(ctx context.Context, offset kafka.Offset, batch zbuf.Batch) error { + batchLen := batch.Length() + done := make(chan error) + ctx, cancel := context.WithCancel(ctx) + go func(start, end kafka.Offset) { + defer close(done) + off := start + for { + select { + case ev := <-p.producer.Events(): + msg, ok := ev.(*kafka.Message) + if !ok { + continue + } + if msg.TopicPartition.Error != nil { + done <- msg.TopicPartition.Error + return + } + if msg.TopicPartition.Offset != off { + done <- fmt.Errorf("out of sync: expected %d, got %d (in batch %d,%d)", off, msg.TopicPartition.Offset, start, end) + return + } + off++ + if off >= end { + return + } + case <-ctx.Done(): + return + } } - id, err = p.CreateSchema(string(schema)) - if err != nil { + }(offset, offset+kafka.Offset(batchLen)) + for k := 0; k < batchLen; k++ { + rec := batch.Index(k) + if err := p.write(rec); err != nil { + cancel() return err } - p.mapper[rec.Type] = id } - b, err := zavro.Encode(nil, uint32(id), rec) + // Wait for all messages to be delivered. + for p.producer.Flush(1000) != 0 { + } + cancel() + return <-done +} + +func (p *Producer) write(rec *zng.Record) error { + key, err := rec.Access("key") + if err != nil { + key = zng.Value{Type: zng.TypeNull} + } + keySchemaID, err := p.lookupSchema(key.Type) + if err != nil { + return err + } + val, err := rec.Access("value") + if err != nil { + val = rec.Value + } + valSchemaID, err := p.lookupSchema(val.Type) + if err != nil { + return err + } + keyBytes, err := zavro.Encode(nil, uint32(keySchemaID), key) + if err != nil { + return err + } + valBytes, err := zavro.Encode(nil, uint32(valSchemaID), val) if err != nil { return err } p.producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &p.topic, - Partition: kafka.PartitionAny, + Partition: 0, }, - Key: nil, - Value: b}, nil) + Key: keyBytes, + Value: valBytes}, + nil) return err } +func (p *Producer) lookupSchema(typ zng.Type) (int, error) { + id, ok := p.mapper[typ] + if !ok { + s, err := zavro.EncodeSchema(typ, p.namespace) + if err != nil { + return 0, err + } + schema, err := json.Marshal(s) + if err != nil { + return 0, err + } + id, err = p.CreateSchema(string(schema)) + if err != nil { + return 0, err + } + p.mapper[typ] = id + } + return id, nil +} + func (p *Producer) CreateSchema(schema string) (int, error) { // We use RecordNameStrategy for the subject name so we can have // different schemas on the same topic. diff --git a/fifo/to.go b/fifo/to.go new file mode 100644 index 0000000..c592e8c --- /dev/null +++ b/fifo/to.go @@ -0,0 +1,56 @@ +package fifo + +import ( + "context" + "fmt" + + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zson" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + +// To provides a means to sync from a Zed data pool to a Kafka topic in a +// consistent and crash-recoverable fashion. The data synced to the topic +// has the same offset as the kafka.offset field in the records in the pool. +type To struct { + zctx *zson.Context + dst *Producer + src *Lake + batch zbuf.Batch +} + +func NewTo(zctx *zson.Context, dst *Producer, src *Lake) *To { + return &To{ + zctx: zctx, + dst: dst, + src: src, + } +} + +const BatchSize = 200 + +func (t *To) Sync(ctx context.Context) error { + offset, err := t.dst.HeadOffset() + if err != nil { + return err + } + for { + // Query of batch of records that start at the given offset. + batch, err := t.src.ReadBatch(ctx, offset, BatchSize) + if err != nil { + return err + } + batchLen := batch.Length() + if batchLen == 0 { + fmt.Printf("reached sync at offset %d\n", offset) + //XXX should pause and poll again... for now, exit + break + } + if err := t.dst.Send(ctx, offset, batch); err != nil { + return err + } + fmt.Printf("committed %d record%s at offset %d to output topic\n", batchLen, plural(batchLen), offset) + offset += kafka.Offset(batchLen) + } + return nil +} diff --git a/go.mod b/go.mod index 062c097..0a21923 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,12 @@ module github.com/brimdata/zinger go 1.16 require ( - github.com/brimdata/zed v0.30.0 + github.com/brimdata/zed v0.30.1-0.20210926201001-72e72fa090e7 github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect github.com/go-avro/avro v0.0.0-20171219232920-444163702c11 github.com/riferrei/srclient v0.4.0 github.com/segmentio/ksuid v1.0.2 + go.uber.org/zap v1.16.0 gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183 // indirect gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 ) diff --git a/go.sum b/go.sum index a438d62..090dc89 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f/go.mod h1:2stg github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/brimdata/zed v0.30.0 h1:9xWdoeNo/+uL7bCahBNg6PqgXYdtFPW4eLfFjIpBl/M= -github.com/brimdata/zed v0.30.0/go.mod h1:4osPv9ipyN80eM8ejJp4dVh115p6K5O+bIhN574ubHI= +github.com/brimdata/zed v0.30.1-0.20210926201001-72e72fa090e7 h1:ZOP5hJcOAAE+PGnhuTS6MJfyV5NzZxlMub02ZM1ntFE= +github.com/brimdata/zed v0.30.1-0.20210926201001-72e72fa090e7/go.mod h1:5GA5oBi8QsnfQO09zypYBFORSe4DLa21LW4J2QdBQiA= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM= @@ -46,6 +46,7 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis/v8 v8.4.11/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo= +github.com/go-resty/resty/v2 v2.2.0 h1:vgZ1cdblp8Aw4jZj3ZsKh6yKAlMg3CHMrqFSFFd+jgY= github.com/go-resty/resty/v2 v2.2.0/go.mod h1:nYW/8rxqQCmI3bPz9Fsmjbr2FBjGuR2Mzt6kDh3zZ7w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -109,7 +110,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -273,7 +273,6 @@ gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 h1:+RlmciBLDd/XwM1iudiG3HtCg45purnsOxEoY/+JZdQ= gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= diff --git a/kafka.json b/kafka.json new file mode 100644 index 0000000..34d93e6 --- /dev/null +++ b/kafka.json @@ -0,0 +1,7 @@ +{ + "bootstrap_servers": "", + "security_protocol": "SASL_SSL", + "sasl_mechanisms": "PLAIN", + "sasl_username": "", + "sasl_password": "" +} diff --git a/schema_registry.json b/schema_registry.json new file mode 100644 index 0000000..e73e645 --- /dev/null +++ b/schema_registry.json @@ -0,0 +1,5 @@ +{ + "url": "", + "user":"", + "password":"" +} diff --git a/zavro/decoder.go b/zavro/decoder.go index 6423d21..f133507 100644 --- a/zavro/decoder.go +++ b/zavro/decoder.go @@ -10,16 +10,19 @@ import ( "github.com/go-avro/avro" ) -func Decode(in []byte, schema *avro.RecordSchema) (zcode.Bytes, error) { +func Decode(in []byte, schema avro.Schema) (zcode.Bytes, error) { var b zcode.Builder - in, err := decodeRecord(&b, in, schema) + in, err := decodeAny(&b, in, schema) if err != nil { return nil, err } if len(in) != 0 { return nil, fmt.Errorf("avro decoder: extra data of length %d", len(in)) } - return b.Bytes().ContainerBody() + if _, ok := schema.(*avro.RecordSchema); ok { + return b.Bytes().ContainerBody() + } + return b.Bytes(), nil } func decodeAny(b *zcode.Builder, in []byte, schema avro.Schema) ([]byte, error) { @@ -38,9 +41,8 @@ func decodeAny(b *zcode.Builder, in []byte, schema avro.Schema) ([]byte, error) func decodeRecord(b *zcode.Builder, in []byte, schema *avro.RecordSchema) ([]byte, error) { b.BeginContainer() for _, avroField := range schema.Fields { - avroType := avroField.Type var err error - in, err = decodeAny(b, in, avroType) + in, err = decodeAny(b, in, avroField.Type) if err != nil { return nil, err } @@ -121,6 +123,9 @@ func decodeCountedValue(in []byte) ([]byte, []byte) { func decodeScalar(b *zcode.Builder, in []byte, schema avro.Schema) ([]byte, error) { switch schema := schema.(type) { + case *avro.NullSchema: + b.AppendNull() + return in, nil case *avro.BooleanSchema: if len(in) == 0 { return nil, errors.New("end of input decoding bool") @@ -136,7 +141,7 @@ func decodeScalar(b *zcode.Builder, in []byte, schema avro.Schema) ([]byte, erro } b.AppendPrimitive(zng.EncodeInt(v)) return in, nil - case *avro.FloatSchema, *avro.DoubleSchema: //XXX + case *avro.FloatSchema, *avro.DoubleSchema: //XXX see zinger issue #19 // avro says this is Java's doubleToLongBits... // we need to check if Go math lib is the same if len(in) < 8 { @@ -151,16 +156,6 @@ func decodeScalar(b *zcode.Builder, in []byte, schema avro.Schema) ([]byte, erro } b.AppendPrimitive(body) return in, nil - /* - case zng.IDTime: - // XXX map a nano to a microsecond time - ts, err := zng.DecodeInt(body) - if err != nil { - return nil, err - } - us := ts / 1000 - return appendVarint(dst, us), nil - */ default: return nil, fmt.Errorf("unsupported avro schema: %T", schema) } diff --git a/zavro/encoder.go b/zavro/encoder.go index 7d9f0e9..1ccb22c 100644 --- a/zavro/encoder.go +++ b/zavro/encoder.go @@ -12,22 +12,16 @@ import ( // These errors shouldn't happen because the input should be type checked. var ErrBadValue = errors.New("bad zng value in kavro translator") -func Encode(dst []byte, id uint32, r *zng.Record) ([]byte, error) { +func Encode(dst []byte, id uint32, zv zng.Value) ([]byte, error) { // build kafka/avro header var hdr [5]byte hdr[0] = 0 binary.BigEndian.PutUint32(hdr[1:], uint32(id)) dst = append(dst, hdr[:]...) - // write value body seralized as avro - typ, ok := r.Type.(*zng.TypeRecord) - if !ok { - //XXX shouldn't happen - return nil, errors.New("not a record") - } - return encodeRecord(dst, typ, r.Bytes) + return encodeAny(dst, zv) } -//XXX move this to zval +//XXX move this to zed/zcode. func zlen(zv zcode.Bytes) (int, error) { it := zcode.Iter(zv) cnt := 0 @@ -41,72 +35,24 @@ func zlen(zv zcode.Bytes) (int, error) { return cnt, nil } -func encodeArray(dst []byte, typ *zng.TypeArray, body zcode.Bytes) ([]byte, error) { - if body == nil { - return dst, nil - } - cnt, err := zlen(body) - if err != nil { - return nil, err - } - dst = appendVarint(dst, int64(cnt)) - inner := zng.InnerType(typ) - it := zcode.Iter(body) - for !it.Done() { - body, container, err := it.Next() - if err != nil { - return nil, err - } - switch v := inner.(type) { - case *zng.TypeRecord: - if !container { - return nil, ErrBadValue - } - dst, err = encodeRecord(dst, v, body) - if err != nil { - return nil, err - } - case *zng.TypeArray: - if !container { - return nil, ErrBadValue - } - dst, err = encodeArray(dst, v, body) - if err != nil { - return nil, err - } - case *zng.TypeSet: - if !container { - return nil, ErrBadValue - } - dst, err = encodeSet(dst, v, body) - if err != nil { - return nil, err - } - default: - if container { - return nil, ErrBadValue - } - dst, err = encodeScalar(dst, v, body) - if err != nil { - return nil, err - } - } - } - if cnt != 0 { - // append 0-length block to indicate end of array - dst = appendVarint(dst, int64(0)) +func encodeAny(dst []byte, zv zng.Value) ([]byte, error) { + switch typ := zv.Type.(type) { + case *zng.TypeRecord: + return encodeRecord(dst, typ, zv.Bytes) + case *zng.TypeArray: + return encodeArray(dst, typ.Type, zv.Bytes) + case *zng.TypeSet: + // encode set as array + return encodeArray(dst, typ.Type, zv.Bytes) + default: + return encodeScalar(dst, typ, zv.Bytes) } - return dst, nil } -func encodeSet(dst []byte, typ *zng.TypeSet, body zcode.Bytes) ([]byte, error) { +func encodeArray(dst []byte, elemType zng.Type, body zcode.Bytes) ([]byte, error) { if body == nil { return dst, nil } - inner := zng.InnerType(typ) - if zng.IsContainerType(inner) { - return nil, ErrBadValue - } cnt, err := zlen(body) if err != nil { return nil, err @@ -114,14 +60,11 @@ func encodeSet(dst []byte, typ *zng.TypeSet, body zcode.Bytes) ([]byte, error) { dst = appendVarint(dst, int64(cnt)) it := zcode.Iter(body) for !it.Done() { - body, container, err := it.Next() + body, _, err := it.Next() if err != nil { return nil, err } - if container { - return nil, ErrBadValue - } - dst, err = encodeScalar(dst, inner, body) + dst, err = encodeAny(dst, zng.Value{elemType, body}) if err != nil { return nil, err } @@ -142,7 +85,7 @@ func encodeRecord(dst []byte, typ *zng.TypeRecord, body zcode.Bytes) ([]byte, er if it.Done() { return nil, ErrBadValue } - body, container, err := it.Next() + body, _, err := it.Next() if err != nil { return nil, err } @@ -154,61 +97,22 @@ func encodeRecord(dst []byte, typ *zng.TypeRecord, body zcode.Bytes) ([]byte, er // field is present. encode the field union by referencing // the type's position in the union. dst = appendVarint(dst, 1) - switch v := col.Type.(type) { - case *zng.TypeRecord: - if !container { - return nil, ErrBadValue - } - dst, err = encodeRecord(dst, v, body) - if err != nil { - return nil, err - } - case *zng.TypeArray: - if !container { - return nil, ErrBadValue - } - dst, err = encodeArray(dst, v, body) - if err != nil { - return nil, err - } - case *zng.TypeSet: - if !container { - return nil, ErrBadValue - } - dst, err = encodeSet(dst, v, body) - if err != nil { - return nil, err - } - default: - if container { - return nil, ErrBadValue - } - dst, err = encodeScalar(dst, col.Type, body) - if err != nil { - return nil, err - } + dst, err = encodeAny(dst, zng.Value{col.Type, body}) + if err != nil { + return nil, err } } return dst, nil } -func appendVarint(dst []byte, v int64) []byte { - var encoding [binary.MaxVarintLen64]byte - n := binary.PutVarint(encoding[:], v) - return append(dst, encoding[:n]...) -} - -func appendCountedValue(dst, val []byte) []byte { - dst = appendVarint(dst, int64(len(val))) - return append(dst, val...) -} - func encodeScalar(dst []byte, typ zng.Type, body zcode.Bytes) ([]byte, error) { if body == nil { //XXX need to encode empty stuff return dst, nil } switch typ.ID() { + case zng.IDNull: + return dst, nil case zng.IDIP: // IP addresses are turned into strings... ip, err := zng.DecodeIP(body) @@ -258,7 +162,6 @@ func encodeScalar(dst []byte, typ zng.Type, body zcode.Bytes) ([]byte, error) { case zng.IDString, zng.IDBstring: return appendCountedValue(dst, []byte(body)), nil case zng.IDNet: - // IP subnets are turned into strings... net, err := zng.DecodeNet(body) if err != nil { return nil, err @@ -266,7 +169,7 @@ func encodeScalar(dst []byte, typ zng.Type, body zcode.Bytes) ([]byte, error) { b := []byte(net.String()) return appendCountedValue(dst, b), nil case zng.IDTime: - // XXX map a nano to a microsecond time + // map a nano to a microsecond time ts, err := zng.DecodeInt(body) if err != nil { return nil, err @@ -277,3 +180,14 @@ func encodeScalar(dst []byte, typ zng.Type, body zcode.Bytes) ([]byte, error) { return nil, fmt.Errorf("encodeScalar: unknown type: %q", typ) } } + +func appendVarint(dst []byte, v int64) []byte { + var encoding [binary.MaxVarintLen64]byte + n := binary.PutVarint(encoding[:], v) + return append(dst, encoding[:n]...) +} + +func appendCountedValue(dst, val []byte) []byte { + dst = appendVarint(dst, int64(len(val))) + return append(dst, val...) +} diff --git a/zavro/schema.go b/zavro/schema.go index f6cc888..852ca74 100644 --- a/zavro/schema.go +++ b/zavro/schema.go @@ -9,8 +9,6 @@ import ( "github.com/go-avro/avro" ) -//XXX need to add maps - func EncodeSchema(typ zng.Type, namespace string) (avro.Schema, error) { switch typ := typ.(type) { case *zng.TypeRecord: @@ -73,6 +71,8 @@ func encodeSetSchema(typ *zng.TypeSet, namespace string) (avro.Schema, error) { func encodeScalarSchema(typ zng.Type) (avro.Schema, error) { switch typ.ID() { + case zng.IDNull: + return &avro.NullSchema{}, nil case zng.IDIP: // IP addresses are turned into strings... return &avro.StringSchema{}, nil @@ -170,8 +170,8 @@ func decodeUnionSchema(zctx *zson.Context, schema *avro.UnionSchema) (zng.Type, } func decodeScalarSchema(schema avro.Schema) (zng.Type, error) { - //XXX IPs need meta-data/alias, could also try to parse string as option - //XXX meta-data, alias to recover unsigneds? + //XXX IPs need metadata/alias, could also try to parse string as option + //XXX metadata, alias to recover unsigneds? switch schema := schema.(type) { case *avro.BooleanSchema: return zng.TypeBool, nil @@ -181,6 +181,8 @@ func decodeScalarSchema(schema avro.Schema) (zng.Type, error) { return zng.TypeFloat64, nil case *avro.StringSchema: return zng.TypeString, nil + case *avro.NullSchema: + return zng.TypeNull, nil default: return nil, fmt.Errorf("unsupported avro schema type: %T", schema) }