Skip to content

Commit

Permalink
modify "zync from-kafka" for continuous operation
Browse files Browse the repository at this point in the history
* Run indefinintely, unless the -exitafter flag is specified.

* Sync multiple Kafka topics.  Topics and their target pools are read
  from the inputs section of ETL YAMl files specified on the command
  line.  (One topic/pool pair may also be specified via -topic and
  -pool.)

* Limit the interval between receiving and committing a record with
  the -interval flag.

* Limit the number of records per commit with the -thresh flag.
  • Loading branch information
nwt committed Jul 16, 2022
1 parent 8387674 commit 234a11c
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 157 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ zed serve -lake scratch
Now, in your first shell, sync data from Kafka to a Zed lake:
```
zed create -orderby kafka.offset PoolA
zync from-kafka -topic MyTopic -pool PoolA
zync from-kafka -topic MyTopic -pool PoolA -exitafter 1s
```
See the data in the Zed pool:
```
Expand All @@ -80,7 +80,7 @@ echo 'value:={upper:to_upper(value.s),words:split(value.s, ",")}' > shape.zed
And shape the record from `MyTopic` into a new `PoolB`:
```
zed create -orderby kafka.offset PoolB
zync from-kafka -topic MyTopic -pool PoolB -shaper shape.zed
zync from-kafka -topic MyTopic -pool PoolB -shaper shape.zed -exitafter 1s
zed query -Z "from PoolB"
```

Expand All @@ -105,7 +105,7 @@ access credentials.

`zync` has two sub-commands for synchronizing data to and from Kafka:
* `zync to-kafka` - syncs data from a Zed data pool to a Kafka topic
* `zync from-kafka` - syncs data from a Kafka topic to a Zed data pool
* `zync from-kafka` - syncs data from Kafka topics to Zed data pools

Currently, only the binary
[Kavka/Avro format](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format)
Expand Down
206 changes: 161 additions & 45 deletions cmd/zync/from-kafka/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ import (
"errors"
"flag"
"fmt"
"os/signal"
"syscall"
"time"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zync/cli"
"github.com/brimdata/zync/cmd/zync/root"
"github.com/brimdata/zync/etl"
"github.com/brimdata/zync/fifo"
"github.com/riferrei/srclient"
"golang.org/x/sync/errgroup"
)

func init() {
Expand All @@ -21,94 +26,205 @@ func init() {

var FromSpec = &charm.Spec{
Name: "from-kafka",
Usage: "from-kafka [options]",
Short: "sync a Kafka topic to a Zed lake pool",
Usage: "from-kafka [options] [config.yaml ...]",
Short: "sync Kafka topics to Zed lake pools",
Long: `
The "from-kafka" command syncs data on a Kafka topic to a Zed lake pool.
The Zed records are transcoded from Avro into Zed and synced
to the main branch of the Zed data pool specified.
The "from-kafka" command syncs data from Kafka topics to Zed lake pools.
Topics and their target pools are read from the inputs section of the
config.yaml files. (One topic/pool pair may also be specified via -topic
and -pool.) The Kafka records are transcoded into Zed and synced to the
main branch of the target pools.
The data pool's key must be "kafka.offset" sorted in ascending order.
The key for each pool must be "kafka.offset" in ascending order.
See https://github.com/brimdata/zync/README.md for a description
of how this works.
`,
New: NewFrom,
}

type From struct {
*root.Command
flags cli.Flags
lakeFlags cli.LakeFlags
shaper cli.ShaperFlags

flags cli.Flags
lakeFlags cli.LakeFlags
shaperFlags cli.ShaperFlags

exitAfter time.Duration
pool string
thresh int
timeout time.Duration
interval time.Duration
}

func NewFrom(parent charm.Command, fs *flag.FlagSet) (charm.Command, error) {
f := &From{Command: parent.(*root.Command)}
fs.StringVar(&f.pool, "pool", "", "name of Zed data pool")
fs.IntVar(&f.thresh, "thresh", 10*1024*1024, "exit after syncing at least this many records")
fs.DurationVar(&f.timeout, "timeout", 5*time.Second, "exit after waiting this long for new records")
f.flags.SetFlags(fs)
f.lakeFlags.SetFlags(fs)
f.shaper.SetFlags(fs)
f.shaperFlags.SetFlags(fs)
fs.DurationVar(&f.exitAfter, "exitafter", 0, "if >0, exit after this duration")
fs.StringVar(&f.pool, "pool", "", "name of Zed pool")
fs.IntVar(&f.thresh, "thresh", 1024*1024, "maximum number of records per commit")
fs.DurationVar(&f.interval, "interval", 5*time.Second,
"maximum interval between receiving and committing a record")
return f, nil
}

func (f *From) Run(args []string) error {
if f.flags.Topic == "" {
return errors.New("no topic provided")
poolToTopics := map[string]map[string]struct{}{}
if f.pool != "" || f.flags.Topic != "" {
if f.pool == "" || f.flags.Topic == "" {
return errors.New("both -pool and -topic must be set")
}
poolToTopics[f.pool] = map[string]struct{}{f.flags.Topic: {}}
}
if f.pool == "" {
return errors.New("no pool provided")

for _, a := range args {
transform, err := etl.Load(a)
if err != nil {
return fmt.Errorf("%s: %w", a, err)
}
for _, i := range transform.Inputs {
topics, ok := poolToTopics[i.Pool]
if !ok {
topics = map[string]struct{}{}
poolToTopics[i.Pool] = topics
}
topics[i.Topic] = struct{}{}
}
}
shaper, err := f.shaper.Load()
if err != nil {
return err
if len(poolToTopics) == 0 {
if len(args) > 0 {
return errors.New("YAML config files contain no inputs")
}
return errors.New("provide YAML config files or set -pool and -topic")
}
ctx := context.Background()
service, err := f.lakeFlags.Open(ctx)

shaper, err := f.shaperFlags.Load()
if err != nil {
return err
}
lk, err := fifo.NewLake(ctx, f.pool, "", service)

url, credentials, err := cli.SchemaRegistryEndpoint()
if err != nil {
return err
}
consumerOffset, err := lk.NextConsumerOffset(ctx, f.flags.Topic)
registry := srclient.CreateSchemaRegistryClient(url)
registry.SetCredentials(credentials.User, credentials.Password)

config, err := cli.LoadKafkaConfig()
if err != nil {
return err
}
url, secret, err := cli.SchemaRegistryEndpoint()

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
lake, err := f.lakeFlags.Open(ctx)
if err != nil {
return err
}
config, err := cli.LoadKafkaConfig()
if err != nil {
return err
group, ctx := errgroup.WithContext(ctx)
timeoutCtx := ctx
if f.exitAfter > 0 {
timeoutCtx, cancel = context.WithTimeout(ctx, f.exitAfter)
defer cancel()
}
registry := srclient.CreateSchemaRegistryClient(url)
registry.SetCredentials(secret.User, secret.Password)
zctx := zed.NewContext()
consumer, err := fifo.NewConsumer(zctx, config, registry, f.flags.Format, f.flags.Topic, consumerOffset, true)
if err != nil {
return err
for pool, topics := range poolToTopics {
fifoLake, err := fifo.NewLake(ctx, pool, "", lake)
if err != nil {
return err
}
ch := make(chan *zed.Value)
zctx := zed.NewContext()
for t := range topics {
t := t
cOffset, err := fifoLake.NextConsumerOffset(ctx, t)
if err != nil {
return err
}
consumer, err := fifo.NewConsumer(zctx, config, registry, f.flags.Format, t, cOffset, true)
if err != nil {
return err
}
group.Go(func() error {
return f.runRead(timeoutCtx, consumer, ch)
})
}
group.Go(func() error {
return f.runLoad(ctx, timeoutCtx, zctx, fifoLake, shaper, ch)
})
}
from := fifo.NewFrom(zctx, lk, consumer, shaper)
ncommit, nrec, err := from.Sync(ctx, f.thresh, f.timeout)
if ncommit != 0 {
fmt.Printf("synchronized %d record%s in %d commit%s\n", nrec, plural(nrec), ncommit, plural(ncommit))
} else {
fmt.Println("nothing new found to synchronize")
return group.Wait()
}

func (f *From) runRead(ctx context.Context, c *fifo.Consumer, ch chan<- *zed.Value) error {
for {
val, err := c.ReadValue(ctx)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// Return nil so we don't cancel the context
// from errgroup.WithContext.
return nil
}
return err
}
select {
case ch <- val:
case <-ctx.Done():
}
}
}

func (f *From) runLoad(ctx, timeoutCtx context.Context, zctx *zed.Context, fifoLake *fifo.Lake, shaper string,
ch <-chan *zed.Value) error {
ticker := time.NewTicker(f.interval)
defer ticker.Stop()
// Stop ticker until data arrives.
ticker.Stop()
a := &zbuf.Array{}
for {
select {
case val := <-ch:
a.Append(val)
if n := len(a.Values()); n < f.thresh {
if n == 1 {
// Start ticker.
ticker.Reset(f.interval)
}
continue
}
case <-ticker.C:
if len(a.Values()) == 0 {
continue
}
case <-ctx.Done():
return ctx.Err()
case <-timeoutCtx.Done():
if len(a.Values()) == 0 {
return nil
}
}
// Stop ticker until more data arrives.
ticker.Stop()
if shaper != "" {
var err error
a, err = fifo.RunLocalQuery(ctx, zctx, a, shaper)
if err != nil {
return err
}
}
n := len(a.Values())
if n == 0 {
// Shaper dropped everything.
continue
}
commit, err := fifoLake.LoadBatch(ctx, zctx, a)
if err != nil {
return err
}
fmt.Printf("commit %s %d record%s\n", commit, n, plural(n))
}
return err
}

func plural(n int64) string {
func plural(n int) string {
if n == 1 {
return ""
}
Expand Down
60 changes: 25 additions & 35 deletions fifo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/brimdata/zed"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zcode"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zson"
Expand All @@ -25,6 +24,8 @@ type Consumer struct {
topic string
metaType zed.Type
types map[zed.Type]map[zed.Type]zed.Type

recordIter kgo.FetchesRecordIter
}

type decoder interface {
Expand Down Expand Up @@ -71,48 +72,37 @@ func (c *Consumer) Close() {
c.kclient.Close()
}

type Flusher interface {
Flush() error
// ReadValue returns the next value. Unlike zio.Reader.Read, the caller
// receives ownership of zed.Value.Bytes.
func (c *Consumer) ReadValue(ctx context.Context) (*zed.Value, error) {
for {
if !c.recordIter.Done() {
return c.handle(c.recordIter.Next())
}
fetches := c.kclient.PollFetches(ctx)
for _, e := range fetches.Errors() {
if e.Topic != "" {
return nil, fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err)
}
return nil, e.Err
}
c.recordIter = *fetches.RecordIter()
}
}

func (c *Consumer) Run(ctx context.Context, w zio.Writer, timeout time.Duration) error {
return c.run(ctx, w, -1, timeout)
}

func (c *Consumer) Read(ctx context.Context, thresh int, timeout time.Duration) (*zbuf.Array, error) {
var a zbuf.Array
return &a, c.run(ctx, &a, thresh, timeout)
}

func (c *Consumer) run(ctx context.Context, w zio.Writer, thresh int, timeout time.Duration) error {
var size int
for {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
fetches := c.kclient.PollFetches(timeoutCtx)
timedOut := timeoutCtx.Err() == context.DeadlineExceeded
val, err := c.ReadValue(timeoutCtx)
cancel()
if err := ctx.Err(); err != nil {
return err
}
if timedOut {
return nil
}
cancel()
for _, e := range fetches.Errors() {
return fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err)
}
for it := fetches.RecordIter(); !it.Done(); {
rec, err := c.handle(it.Next())
if err != nil {
return err
}
if err := w.Write(rec); err != nil {
return err
if err != nil {
if ctx.Err() == nil && timeoutCtx.Err() == context.DeadlineExceeded {
return nil
}
size += len(rec.Bytes)
return err
}
if thresh > -1 && size > thresh {
return nil
if err := w.Write(val); err != nil {
return err
}
}
}
Expand Down
Loading

0 comments on commit 234a11c

Please sign in to comment.