-
Notifications
You must be signed in to change notification settings - Fork 79
/
Copy pathdata_processor.go
125 lines (110 loc) · 3.81 KB
/
data_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ratchet
import (
"container/list"
"fmt"
"sync"
"github.com/dailyburn/ratchet/data"
)
// DataProcessor is the interface that should be implemented to perform data-related
// tasks within a Pipeline. DataProcessors are responsible for receiving, processing,
// and then sending data on to the next stage of processing.
type DataProcessor interface {
// ProcessData will be called for each data sent from the previous stage.
// ProcessData is called with a data.JSON instance, which is the data being received,
// an outputChan, which is the channel to send data to, and a killChan,
// which is a channel to send unexpected errors to (halting execution of the Pipeline).
ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
// Finish will be called after the previous stage has finished sending data,
// and no more data will be received by this DataProcessor. Often times
// Finish can be an empty function implementation, but sometimes it is
// necessary to perform final data processing.
Finish(outputChan chan data.JSON, killChan chan error)
}
// dataProcessor is a type used internally to the Pipeline management
// code, and wraps a DataProcessor instance. DataProcessor is the main
// interface that should be implemented to perform work within the data
// pipeline, and this dataProcessor type simply embeds it and adds some
// helpful channel management and other attributes.
type dataProcessor struct {
DataProcessor
executionStat
concurrentDataProcessor
chanBrancher
chanMerger
outputs []DataProcessor
inputChan chan data.JSON
outputChan chan data.JSON
}
type chanBrancher struct {
branchOutChans []chan data.JSON
}
func (dp *dataProcessor) branchOut() {
go func() {
for d := range dp.outputChan {
for _, out := range dp.branchOutChans {
// Make a copy to ensure concurrent stages
// can alter data as needed.
dc := make(data.JSON, len(d))
copy(dc, d)
out <- dc
}
dp.recordDataSent(d)
}
// Once all data is received, also close all the outputs
for _, out := range dp.branchOutChans {
close(out)
}
}()
}
type chanMerger struct {
mergeInChans []chan data.JSON
mergeWait sync.WaitGroup
}
func (dp *dataProcessor) mergeIn() {
// Start a merge goroutine for each input channel.
mergeData := func(c chan data.JSON) {
for d := range c {
dp.inputChan <- d
}
dp.mergeWait.Done()
}
dp.mergeWait.Add(len(dp.mergeInChans))
for _, in := range dp.mergeInChans {
go mergeData(in)
}
go func() {
dp.mergeWait.Wait()
close(dp.inputChan)
}()
}
// Do takes a DataProcessor instance and returns the dataProcessor
// type that will wrap it for internal ratchet processing. The details
// of the dataProcessor wrapper type are abstracted away from the
// implementing end-user code. The "Do" function is named
// succinctly to provide a nicer syntax when creating a PipelineLayout.
// See the ratchet package documentation for code examples of creating
// a new branching pipeline layout.
func Do(processor DataProcessor) *dataProcessor {
dp := dataProcessor{DataProcessor: processor}
dp.outputChan = make(chan data.JSON)
dp.inputChan = make(chan data.JSON)
if isConcurrent(processor) {
dp.concurrency = processor.(ConcurrentDataProcessor).Concurrency()
dp.workThrottle = make(chan workSignal, dp.concurrency)
dp.workList = list.New()
dp.doneChan = make(chan bool)
dp.inputClosed = false
}
return &dp
}
// Outputs should be called to specify which DataProcessor instances the current
// processor should send it's output to. See the ratchet package
// documentation for code examples and diagrams.
func (dp *dataProcessor) Outputs(processors ...DataProcessor) *dataProcessor {
dp.outputs = processors
return dp
}
// pass through String output to the DataProcessor
func (dp *dataProcessor) String() string {
return fmt.Sprintf("%v", dp.DataProcessor)
}