Skip to content

Commit

Permalink
NCR-14625 add apply example
Browse files Browse the repository at this point in the history
  • Loading branch information
solokirrik committed Jan 24, 2024
1 parent 8aa1d7a commit 36a27b2
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 18 deletions.
79 changes: 61 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,47 @@ If you have another common use case you would like to see covered by this packag

Apply connects two processes, applying the second to each item of the first output

```golang
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)

apply := pipeline.Apply(
transform,
pipeline.Sequence(
double,
addLeadingZero,
double,
),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
for j := range out {
fmt.Printf("process: %s\n", out[j])
}
}
```

Output:

```
process: 011011
process: 022022
process: 033033
process: 044044
process: 055055
```

### func [Buffer](/buffer.go#L5)

`func Buffer[Item any](size int, in <-chan Item) <-chan Item`
Expand Down Expand Up @@ -238,7 +279,6 @@ ProcessBatchConcurrently fans the in channel out to multiple batch Processors ru
then it fans the out channels of the batch Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -260,16 +300,18 @@ p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProces
for result := range p {
fmt.Printf("result: %d\n", result)
}
```

// Example Output:
// result: 1
// result: 2
// result: 3
// result: 5
// error: could not process [7 8], context deadline exceeded
// error: could not process [4 6], context deadline exceeded
// error: could not process [9], context deadline exceeded
Output:

```
result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded
```

### func [ProcessConcurrently](/process.go#L26)
Expand All @@ -280,7 +322,6 @@ ProcessConcurrently fans the in channel out to multiple Processors running concu
then it fans the out channels of the Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -302,16 +343,18 @@ p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.
for result := range p {
log.Printf("result: %d\n", result)
}
```

// Example Output:
// result: 2
// result: 1
// result: 4
// result: 3
// error: could not process 6, process was canceled
// error: could not process 5, process was canceled
// error: could not process 7, context deadline exceeded
Output:

```
result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded
```

### func [Split](/split.go#L4)
Expand Down
47 changes: 47 additions & 0 deletions apply_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pipeline_test

import (
"context"
"fmt"
"strings"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleApply() {
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)

apply := pipeline.Apply(
transform,
pipeline.Sequence(
double,
addLeadingZero,
double,
),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
for j := range out {
fmt.Printf("process: %s\n", out[j])
}
}

// Output:
// process: 011011
// process: 022022
// process: 033033
// process: 044044
// process: 055055
}

0 comments on commit 36a27b2

Please sign in to comment.