forked from taggledevel2/ratchet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_processor.go
152 lines (137 loc) · 4.23 KB
/
data_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package ratchet
import (
"container/list"
"context"
"fmt"
"sync"
"github.com/rhansen2/ratchet/data"
)
// DataProcessor is the interface that should be implemented to perform data-related
// tasks within a Pipeline. DataProcessors are responsible for receiving, processing,
// and then sending data on to the next stage of processing.
type DataProcessor interface {
// ProcessData will be called for each data sent from the previous stage.
// ProcessData is called with a data.JSON instance, which is the data being received,
// an outputChan, which is the channel to send data to, and a killChan,
// which is a channel to send unexpected errors to (halting execution of the Pipeline).
ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error, ctx context.Context)
// Finish will be called after the previous stage has finished sending data,
// and no more data will be received by this DataProcessor. Often times
// Finish can be an empty function implementation, but sometimes it is
// necessary to perform final data processing.
Finish(outputChan chan data.JSON, killChan chan error, ctx context.Context)
}
// dataProcessor is a type used internally to the Pipeline management
// code, and wraps a DataProcessor instance. DataProcessor is the main
// interface that should be implemented to perform work within the data
// pipeline, and this dataProcessor type simply embeds it and adds some
// helpful channel management and other attributes.
type dataProcessor struct {
DataProcessor
executionStat
concurrentDataProcessor
chanBrancher
chanMerger
outputs []DataProcessor
inputChan chan data.JSON
outputChan chan data.JSON
ctx context.Context
}
type chanBrancher struct {
branchOutChans []chan data.JSON
}
func (dp *dataProcessor) branchOut() {
go func() {
processLoop:
for {
select {
case d, ok := <-dp.outputChan:
if !ok {
break processLoop
}
for _, out := range dp.branchOutChans {
// Make a copy to ensure concurrent stages
// can alter data as needed.
dc := make(data.JSON, len(d))
copy(dc, d)
select {
case out <- dc:
case <-dp.ctx.Done():
return
}
}
dp.recordDataSent(d)
case <-dp.ctx.Done():
return
}
}
// Once all data is received, also close all the outputs
for _, out := range dp.branchOutChans {
close(out)
}
}()
}
type chanMerger struct {
mergeInChans []chan data.JSON
mergeWait sync.WaitGroup
}
func (dp *dataProcessor) mergeIn() {
// Start a merge goroutine for each input channel.
mergeData := func(c chan data.JSON) {
defer dp.mergeWait.Done()
for {
select {
case d, ok := <-c:
if !ok {
return
}
select {
case dp.inputChan <- d:
case <-dp.ctx.Done():
return
}
case <-dp.ctx.Done():
return
}
}
}
dp.mergeWait.Add(len(dp.mergeInChans))
for _, in := range dp.mergeInChans {
go mergeData(in)
}
go func() {
dp.mergeWait.Wait()
close(dp.inputChan)
}()
}
// Do takes a DataProcessor instance and returns the dataProcessor
// type that will wrap it for internal ratchet processing. The details
// of the dataProcessor wrapper type are abstracted away from the
// implementing end-user code. The "Do" function is named
// succinctly to provide a nicer syntax when creating a PipelineLayout.
// See the ratchet package documentation for code examples of creating
// a new branching pipeline layout.
func Do(processor DataProcessor) *dataProcessor {
dp := dataProcessor{DataProcessor: processor}
dp.outputChan = make(chan data.JSON)
dp.inputChan = make(chan data.JSON)
if isConcurrent(processor) {
dp.concurrency = processor.(ConcurrentDataProcessor).Concurrency()
dp.workThrottle = make(chan workSignal, dp.concurrency)
dp.workList = list.New()
dp.doneChan = make(chan bool)
dp.inputClosed = false
}
return &dp
}
// Outputs should be called to specify which DataProcessor instances the current
// processor should send it's output to. See the ratchet package
// documentation for code examples and diagrams.
func (dp *dataProcessor) Outputs(processors ...DataProcessor) *dataProcessor {
dp.outputs = processors
return dp
}
// pass through String output to the DataProcessor
func (dp *dataProcessor) String() string {
return fmt.Sprintf("%v", dp.DataProcessor)
}