Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline.Clone and p.Run(data interface{}) to allow reusable pipe… #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

type buffer struct {
// single writer
in chan string
in chan []byte
// multiple readers
out []chan string
out []chan []byte
progress []chan int64
}

Expand Down Expand Up @@ -59,7 +59,7 @@ func (bfs *buffers) remove(key string) {
delete(bfs.bufferMap, key)
}

func (bfs *buffers) appendOutBuffer(key string, o chan string) error {
func (bfs *buffers) appendOutBuffer(key string, o chan []byte) error {
bfs.Lock()
defer bfs.Unlock()
val, ok := bfs.bufferMap[key]
Expand Down Expand Up @@ -97,12 +97,12 @@ func (bfs *buffers) all() map[string]*buffer {

var buffersMap *buffers

func send(pipelineKey string, line string) {
func send(pipelineKey string, data []byte) {
// line = fmt.Sprintf(time.Now().Format("2006-01-02 15:04:05")) + " " + line
buf, ok := buffersMap.get(pipelineKey)
if !ok {
return
}
buf.in <- line
buf.in <- data

}
28 changes: 14 additions & 14 deletions examples/advanced/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func newDownloadStep(fileName string, bytes int64, fail bool) *downloadStep {

func (d *downloadStep) Exec(request *pipeline.Request) *pipeline.Result {

d.Status(fmt.Sprintf("%+v", request))
d.Status([]byte(fmt.Sprintf("%+v", request)))

d.Status(fmt.Sprintf("Started downloading file %s", d.fileName))
d.Status([]byte(fmt.Sprintf("Started downloading file %s", d.fileName)))

client := &http.Client{}

Expand All @@ -60,19 +60,17 @@ func (d *downloadStep) Exec(request *pipeline.Request) *pipeline.Result {
return &pipeline.Result{Error: fmt.Errorf("File download failed %s", d.fileName)}
}

d.Status(fmt.Sprintf("Successfully downloaded file %s", d.fileName))
d.Status([]byte(fmt.Sprintf("Successfully downloaded file %s", d.fileName)))

return &pipeline.Result{
Error: nil,
Data: struct{ bytesDownloaded int64 }{bytesDownloaded: n},
KeyVal: map[string]interface{}{"bytesDownloaded": n},
Error: nil,
Data: struct{ bytesDownloaded int64 }{bytesDownloaded: n},
}
}

func (d *downloadStep) Cancel() error {
d.Status(fmt.Sprintf("Cancel downloading file %s", d.fileName))
func (d *downloadStep) Cancel() {
d.Status([]byte(fmt.Sprintf("Cancel downloading file %s", d.fileName)))
d.cancel()
return nil
}

func readPipeline(pipe *pipeline.Pipeline) {
Expand All @@ -98,13 +96,15 @@ func readPipeline(pipe *pipeline.Pipeline) {

func main() {

workflow := pipeline.NewProgress("getfiles", 1000, time.Second*7)
workflow := pipeline.New("getfiles",
pipeline.OutBufferSize(1000),
pipeline.ExpectedDuration(7*time.Second))
//stages
stage := pipeline.NewStage("stage", false, false)
stage := pipeline.NewStage("stage", pipeline.Concurrent(true))
// in this stage, steps will be executed concurrently
concurrentStage := pipeline.NewStage("con_stage", true, false)
concurrentStage := pipeline.NewStage("con_stage", pipeline.Concurrent(true))
// another concurrent stage
concurrentErrStage := pipeline.NewStage("con_err_stage", true, false)
concurrentErrStage := pipeline.NewStage("con_err_stage", pipeline.Concurrent(true))

//steps
fileStep1mb := newDownloadStep("1mbfile", 1e6, false)
Expand All @@ -129,7 +129,7 @@ func main() {
go readPipeline(workflow)

// execute pipeline
result := workflow.Run()
result := workflow.Run(nil)
if result.Error != nil {
fmt.Println(result.Error)
}
Expand Down
78 changes: 78 additions & 0 deletions examples/functional/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"fmt"

"github.com/myntra/pipeline"
)

// TransformStep ...
func TransformStep(context context.Context, request *pipeline.Request) *pipeline.Result {
request.Status([]byte("Starting transformstep"))
fmt.Println(request.Data)
// handle cancel step: https://blog.golang.org/context
//<-context.Done() is unblocked when step is cancelled on error returned by sibling concurrent step
return nil
}

func readPipeline(pipe *pipeline.Pipeline) {
out, err := pipe.Out()
if err != nil {
return
}

progress, err := pipe.GetProgressPercent()
if err != nil {
return
}

for {
select {
case line := <-out:
fmt.Println(line)
case p := <-progress:
fmt.Println("percent done: ", p)
}
}
}

func main() {

logParserPipe := pipeline.New("")
stageOne := pipeline.NewStage("")

transfromStep := pipeline.NewStep(TransformStep)

stageOne.AddStep(transfromStep)
logParserPipe.AddStage(stageOne)

go readPipeline(logParserPipe)

data := map[string]string{
"a": "b",
}

result := logParserPipe.Run(data)
if result.Error != nil {
fmt.Println(result.Error)
}

fmt.Println("timeTaken:", logParserPipe.GetDuration())

clonedlogParserPipe := logParserPipe.Clone("")

go readPipeline(clonedlogParserPipe)

data2 := map[string]string{
"c": "d",
}

result = clonedlogParserPipe.Run(data2)
if result.Error != nil {
fmt.Println(result.Error)
}

fmt.Println("timeTaken:", clonedlogParserPipe.GetDuration())

}
20 changes: 10 additions & 10 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ type work struct {

func (w *work) Exec(request *pipeline.Request) *pipeline.Result {

w.Status(fmt.Sprintf("%+v", request))
w.Status([]byte(fmt.Sprintf("%+v", request)))
duration := time.Duration(1000 * w.id)
time.Sleep(time.Millisecond * duration)
msg := fmt.Sprintf("work %d", w.id)
return &pipeline.Result{
Error: nil,
Data: map[string]string{"msg": msg},
KeyVal: map[string]interface{}{"msg": msg},
Error: nil,
Data: map[string]string{"msg": msg},
}
}

func (w *work) Cancel() error {
w.Status("cancel step")
return nil
func (w *work) Cancel() {
w.Status([]byte("cancel step"))
}

func readPipeline(pipe *pipeline.Pipeline) {
Expand All @@ -53,8 +51,10 @@ func readPipeline(pipe *pipeline.Pipeline) {

func main() {

workpipe := pipeline.NewProgress("myProgressworkpipe", 1000, time.Second*3)
stage := pipeline.NewStage("mypworkstage", false, false)
workpipe := pipeline.New("myProgressworkpipe",
pipeline.OutBufferSize(1000),
pipeline.ExpectedDuration(1e3*time.Millisecond))
stage := pipeline.NewStage("mypworkstage", pipeline.Concurrent(true))
step1 := &work{id: 1}
step2 := &work{id: 2}

Expand All @@ -65,7 +65,7 @@ func main() {

go readPipeline(workpipe)

result := workpipe.Run()
result := workpipe.Run(nil)
if result.Error != nil {
fmt.Println(result.Error)
}
Expand Down
73 changes: 73 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package pipeline

import "time"

// DefaultOutDrainTimeout time to wait for all readers to finish consuming output
const DefaultOutDrainTimeout = time.Second * 5

// DefaultOutBufferSize size of the pipeline.Out channel
const DefaultOutBufferSize = 1000

type pipelineConfig struct {
outBufferSize int
outDrainTimeout time.Duration
expectedDuration time.Duration
}

// Option represents an option for the pipeline. It must be used as an arg
// to New(...) or Clone(...)
type Option func(*pipelineConfig)

// OutBufferSize is size of the pipeline.Out channel
func OutBufferSize(size int) Option {
return Option(func(c *pipelineConfig) {
c.outBufferSize = size
})
}

// OutDrainTimeout is the time to wait for all readers to finish consuming output
func OutDrainTimeout(timeout time.Duration) Option {
return Option(func(c *pipelineConfig) {
c.outDrainTimeout = timeout
})
}

// ExpectedDuration is the expected time for the pipeline to finish
func ExpectedDuration(timeout time.Duration) Option {
return Option(func(c *pipelineConfig) {
c.expectedDuration = timeout
})
}

type stageConfig struct {
concurrent bool
disableStrictMode bool
mergeFunc func([]*Result) *Result
}

// StageOption represents an option for a stage. It must be used as an arg
// to NewStage(...)
type StageOption func(*stageConfig)

// Concurrent enables concurrent execution of steps
func Concurrent(enable bool) StageOption {
return StageOption(func(c *stageConfig) {
c.concurrent = enable
})
}

// DisableStrictMode disables cancellation of concurrently executing steps if
// there is an error
func DisableStrictMode(disable bool) StageOption {
return StageOption(func(c *stageConfig) {
c.disableStrictMode = disable
})
}

// MergeFunc is used to merge results from concurrent steps and is only called
// when concurrent steps are enabled.
func MergeFunc(fn func([]*Result) *Result) StageOption {
return StageOption(func(c *stageConfig) {
c.mergeFunc = fn
})
}
Loading