forked from taggledevel2/ratchet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline_test.go
87 lines (74 loc) · 2.46 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package ratchet_test
import (
"fmt"
"os"
"strings"
"github.com/rhansen2/ratchet"
"github.com/rhansen2/ratchet/data"
"github.com/rhansen2/ratchet/logger"
"github.com/rhansen2/ratchet/processors"
)
func ExampleNewPipeline() {
logger.LogLevel = logger.LevelSilent
// A basic pipeline is created using one or more DataProcessor instances.
hello := processors.NewIoReader(strings.NewReader("Hello world!"))
stdout := processors.NewIoWriter(os.Stdout)
pipeline := ratchet.NewPipeline(hello, stdout)
err := <-pipeline.Run()
if err != nil {
fmt.Println("An error occurred in the ratchet pipeline:", err.Error())
}
// Output:
// Hello world!
}
func ExampleNewBranchingPipeline() {
logger.LogLevel = logger.LevelSilent
// This example is very contrived, but we'll first create
// DataProcessors that will spit out strings, do some basic
// transformation, and then filter out all the ones that don't
// match "HELLO".
hello := processors.NewIoReader(strings.NewReader("Hello world"))
hola := processors.NewIoReader(strings.NewReader("Hola mundo"))
bonjour := processors.NewIoReader(strings.NewReader("Bonjour monde"))
upperCaser := processors.NewFuncTransformer(func(d data.JSON) data.JSON {
return data.JSON(strings.ToUpper(string(d)))
})
lowerCaser := processors.NewFuncTransformer(func(d data.JSON) data.JSON {
return data.JSON(strings.ToLower(string(d)))
})
helloMatcher := processors.NewRegexpMatcher("HELLO")
stdout := processors.NewIoWriter(os.Stdout)
// Create the PipelineLayout that will run the DataProcessors
layout, err := ratchet.NewPipelineLayout(
// Stage 1 - spits out hello world in a few languages
ratchet.NewPipelineStage(
ratchet.Do(hello).Outputs(upperCaser, lowerCaser),
ratchet.Do(hola).Outputs(upperCaser),
ratchet.Do(bonjour).Outputs(lowerCaser),
),
// Stage 2 - transforms strings to upper and lower case
ratchet.NewPipelineStage(
ratchet.Do(upperCaser).Outputs(helloMatcher),
ratchet.Do(lowerCaser).Outputs(helloMatcher),
),
// Stage 3 - only lets through strings that match "hello"
ratchet.NewPipelineStage(
ratchet.Do(helloMatcher).Outputs(stdout),
),
// Stage 4 - prints to STDOUT
ratchet.NewPipelineStage(
ratchet.Do(stdout),
),
)
if err != nil {
panic(err.Error())
}
// Create and run the Pipeline
pipeline := ratchet.NewBranchingPipeline(layout)
err = <-pipeline.Run()
if err != nil {
fmt.Println("An error occurred in the ratchet pipeline:", err.Error())
}
// Output:
// HELLO WORLD
}