-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
92 lines (74 loc) · 2.43 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package komi
import (
"os"
"sync"
"sync/atomic"
"time"
"github.com/charmbracelet/log"
)
// New creates a new pool with sensible defaults.
func New[I, O any](optionWork poolWork[I, O]) *Pool[I, O] {
return NewWithSettings(optionWork, nil)
}
// NewWithSettings creates a new pool with custom pool tunings enabled.
func NewWithSettings[I, O any](optionWork poolWork[I, O], settings *Settings) *Pool[I, O] {
p := &Pool[I, O]{
settings: settings,
jobsWaiting: &atomic.Int64{},
jobsCompleted: &atomic.Int64{},
jobsSucceeded: &atomic.Int64{},
tellChildrenToClose: make(chan Signal),
closedSignal: make(chan Signal, 1),
closureRequest: make(chan bool),
closureInternalWait: &sync.WaitGroup{},
log: log.NewWithOptions(os.Stderr, log.Options{
TimeFormat: time.DateTime,
ReportTimestamp: true,
ReportCaller: false,
}),
currentlyWaitingForJobs: &atomic.Bool{},
noJobsCurrentlyWaitingSignal: make(chan Signal),
}
// Run the function to set the work performer for the pool.
optionWork(p)
// If work received and set is a nil function, then immediately panic.
if !p.hasWork() || p.workPerformer == nil {
panic("pool didn't receive any work")
}
// Verify that all settings have been correctly set after work has been confirmed.
if p.settings == nil {
p.settings = &Settings{}
}
verifySettings(p.settings)
// Set the logging levels and options.
p.log.SetLevel(p.settings.LogLevel)
p.log.SetPrefix(p.settings.Name)
// If usar has not provided a manual size setting, then set `size = laborers * ratio`.
if !p.settings.sizeOverride {
p.settings.Size = p.settings.Laborers * p.settings.Ratio
}
// A nice debug.
p.log.Debug("Pool settings initialized")
// Allocate the channel with proper size.
p.inputs = make(chan I, p.settings.Size)
// If the function given produces outputs, also allocate the outputs channel.
if p.producesOutputs() {
p.outputs = make(chan O, p.settings.Size)
}
// If the function given produces errors, also allocated the errors channel.
if p.producesErrors() {
p.errors = make(chan PoolError[I], p.settings.Size)
}
// Fire off all the laborers.
p.startLaborers()
go p.closureRequestListener()
return p
}
// SetLevel the logging level of the pool.
func (p *Pool[_, _]) SetLevel(level log.Level) {
p.log.SetLevel(level)
}
// Debug enables the debug logging in the pool.
func (p *Pool[_, _]) Debug() {
p.log.SetLevel(log.DebugLevel)
}