-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add pool-to-pool ETL #30
Conversation
This commit is a rough first draft for Debezium-style ETL on CDC logs. It handles denormalization of two tables into one as well as stateless transforms on CDC logs. We renamed the "sync from" and "sync to" sub-commands to "from-kafka" and "to-kafka", respectively. We also updated the zed pointer and ported the code to use zed.Value instead of zed.Record. The README contains a demo walkthrough of the basics.
README.md
Outdated
This transform the Zed input to Avro and posts it to the topic. | ||
The consumer then converts the Avro back to Zed and displays it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transform the Zed input to Avro and posts it to the topic. | |
The consumer then converts the Avro back to Zed and displays it. | |
This transforms the ZSON input to Avro and posts it to the topic. | |
The consumer then converts the Avro back to ZSON and displays it. |
README.md
Outdated
|
||
`zync sync from` formats records received from Kafka using the Zed envelope | ||
`zync from-kafka` formats records received from Kafka using the Zed envelope |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`zync from-kafka` formats records received from Kafka using the Zed envelope | |
`zync from-kafka` encapsulates records received from Kafka using the envelope |
README.md
Outdated
overlapping offsets, only one will succeed. The others will detect the conflict, | ||
recompute the `kafka.offset`'s accounting for the data provided in the | ||
conflicting commit, and retry the commit. | ||
all data committed by zync writers must have monotonically increasing `kafka.offset` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all data committed by zync writers must have monotonically increasing `kafka.offset` | |
all data committed by `zync` writers must have monotonically increasing `kafka.offset` |
README.md
Outdated
of multiple tables into one. | ||
|
||
The model here is that `zync etl` processes data from an input pool to an output | ||
pool where `from-kafka` is populating the input pool and `to-kafka` is processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pool where `from-kafka` is populating the input pool and `to-kafka` is processing | |
pool where `zync from-kafka` is populating the input pool and `zync to-kafka` is processing |
etl/pool.go
Outdated
} | ||
|
||
func (*adaptor) NewScheduler(context.Context, *zed.Context, dag.Source, extent.Span, zbuf.Filter, *dag.Filter) (proc.Scheduler, error) { | ||
return nil, fmt.Errorf("mock.Lake.NewScheduler() should not be called") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, fmt.Errorf("mock.Lake.NewScheduler() should not be called") | |
return nil, errors.New("etl.adaptor.NewScheduler should not be called") |
etl/pool.go
Outdated
|
||
type adaptor struct{} | ||
|
||
func (*adaptor) Layout(_ context.Context, src dag.Source) order.Layout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (*adaptor) Layout(_ context.Context, src dag.Source) order.Layout { | |
func (*adaptor) Layout(context.Context, dag.Source) order.Layout { |
etl/pool.go
Outdated
func (*adaptor) Open(_ context.Context, _ *zed.Context, _ string, _ zbuf.Filter) (zbuf.PullerCloser, error) { | ||
return nil, fmt.Errorf("mock.Lake.Open() should not be called") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (*adaptor) Open(_ context.Context, _ *zed.Context, _ string, _ zbuf.Filter) (zbuf.PullerCloser, error) { | |
return nil, fmt.Errorf("mock.Lake.Open() should not be called") | |
func (*adaptor) Open(context.Context, *zed.Context, string, zbuf.Filter) (zbuf.PullerCloser, error) { | |
return nil, errors.New("etl.adaptor.Open should not be called") |
return ksuid.Nil, nil | ||
} | ||
|
||
func (*adaptor) CommitObject(_ context.Context, _ ksuid.KSUID, _ string) (ksuid.KSUID, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (*adaptor) CommitObject(_ context.Context, _ ksuid.KSUID, _ string) (ksuid.KSUID, error) { | |
func (*adaptor) CommitObject(context.Context, ksuid.KSUID, string) (ksuid.KSUID, error) { |
etl/pool.go
Outdated
func (*batchDriver) Warn(warning string) error { return nil } | ||
func (*batchDriver) Stats(stats zbuf.ScannerStats) error { return nil } | ||
func (*batchDriver) ChannelEnd(cid int) error { return nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (*batchDriver) Warn(warning string) error { return nil } | |
func (*batchDriver) Stats(stats zbuf.ScannerStats) error { return nil } | |
func (*batchDriver) ChannelEnd(cid int) error { return nil } | |
func (*batchDriver) Warn(string) error { return nil } | |
func (*batchDriver) Stats(zbuf.ScannerStats) error { return nil } | |
func (*batchDriver) ChannelEnd(int) error { return nil } |
This commit is a rough first draft for Debezium-style ETL on
CDC logs. It handles denormalization of two tables into one
as well as stateless transforms on CDC logs.
We renamed the "sync from" and "sync to" sub-commands to
"from-kafka" and "to-kafka", respectively.
We also updated the zed pointer and ported the code to use
zed.Value instead of zed.Record.
The README contains a demo walkthrough of the basics.