-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathmain.go
140 lines (108 loc) · 3.26 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"context"
"fmt"
"io"
"net/http"
"time"
"io/ioutil"
"github.com/myntra/pipeline"
)
type downloadStep struct {
pipeline.StepContext
fileName string
bytes int64
fail bool
ctx context.Context
cancel context.CancelFunc
}
func newDownloadStep(fileName string, bytes int64, fail bool) *downloadStep {
ctx, cancel := context.WithCancel(context.Background())
d := &downloadStep{fileName: fileName, bytes: bytes, fail: fail}
d.ctx = ctx
d.cancel = cancel
return d
}
func (d *downloadStep) Exec(request *pipeline.Request) *pipeline.Result {
d.Status([]byte(fmt.Sprintf("%+v", request)))
d.Status([]byte(fmt.Sprintf("Started downloading file %s", d.fileName)))
client := &http.Client{}
req, err := http.NewRequest("GET", fmt.Sprintf("http://httpbin.org/bytes/%d", d.bytes), nil)
if err != nil {
return &pipeline.Result{Error: err}
}
req = req.WithContext(d.ctx)
resp, err := client.Do(req)
if err != nil {
return &pipeline.Result{Error: err}
}
defer resp.Body.Close()
n, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
return &pipeline.Result{Error: err}
}
if d.fail {
return &pipeline.Result{Error: fmt.Errorf("File download failed %s", d.fileName)}
}
d.Status([]byte(fmt.Sprintf("Successfully downloaded file %s", d.fileName)))
return &pipeline.Result{
Error: nil,
Data: struct{ bytesDownloaded int64 }{bytesDownloaded: n},
}
}
func (d *downloadStep) Cancel() {
d.Status([]byte(fmt.Sprintf("Cancel downloading file %s", d.fileName)))
d.cancel()
}
func readPipeline(pipe *pipeline.Pipeline) {
out, err := pipe.Out()
if err != nil {
return
}
progress, err := pipe.GetProgressPercent()
if err != nil {
return
}
for {
select {
case line := <-out:
fmt.Println(line)
case p := <-progress:
fmt.Println("percent done: ", p)
}
}
}
func main() {
workflow := pipeline.New("getfiles",
pipeline.OutBufferSize(1000),
pipeline.ExpectedDuration(7*time.Second))
//stages
stage := pipeline.NewStage("stage", pipeline.Concurrent(true))
// in this stage, steps will be executed concurrently
concurrentStage := pipeline.NewStage("con_stage", pipeline.Concurrent(true))
// another concurrent stage
concurrentErrStage := pipeline.NewStage("con_err_stage", pipeline.Concurrent(true))
//steps
fileStep1mb := newDownloadStep("1mbfile", 1e6, false)
fileStep1mbFail := newDownloadStep("1mbfileFail", 1e6, true)
fileStep5mb := newDownloadStep("5mbfile", 5e6, false)
fileStep10mb := newDownloadStep("10mbfile", 10e6, false)
//stage with sequential steps
stage.AddStep(fileStep1mb, fileStep5mb, fileStep10mb)
//stage with concurrent steps
concurrentStage.AddStep(fileStep1mb, fileStep5mb, fileStep10mb)
//stage with concurrent steps one of which fails early, prompting a cancellation
//of the other running steps.
concurrentErrStage.AddStep(fileStep1mbFail, fileStep5mb, fileStep10mb)
// add all stages
workflow.AddStage(stage, concurrentStage, concurrentErrStage)
// start a routine to read out and progress
go readPipeline(workflow)
// execute pipeline
result := workflow.Run(nil)
if result.Error != nil {
fmt.Println(result.Error)
}
// one would persist the time taken duration to use as progress scale for the next workflow build
fmt.Println("timeTaken:", workflow.GetDuration())
}