-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.go
171 lines (149 loc) · 4.14 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package asyncigo
import (
"context"
"time"
)
// Queue provides a basic asynchronous queue.
// Queue is not threadsafe.
type Queue[T any] struct {
data []T
futs []*Future[T]
}
// Get pops the first item from the Queue.
// The returned [Future] will resolve to the popped item
// once data is available.
func (q *Queue[T]) Get() *Future[T] {
fut := NewFuture[T]()
if len(q.data) > 0 {
item := q.data[0]
q.data = q.data[1:]
fut.SetResult(item, nil)
return fut
}
q.futs = append(q.futs, fut)
return fut
}
// Push adds an item to the Queue.
func (q *Queue[T]) Push(item T) {
q.data = append(q.data, item)
for len(q.futs) > 0 && len(q.data) > 0 {
// skip if cancelled
if q.futs[0].HasResult() {
q.futs = q.futs[1:]
continue
}
fut, item := q.futs[0], q.data[0]
q.futs, q.data = q.futs[1:], q.data[1:]
fut.SetResult(item, nil)
}
}
// Mutex provides a simple asynchronous locking mechanism for coroutines.
// Mutex is not threadsafe.
type Mutex struct {
unlockFut *Future[any]
}
// Lock locks the Mutex. If the Mutex is already locked,
// the calling coroutine will be suspended until unlocked.
func (m *Mutex) Lock(ctx context.Context) error {
for {
if m.unlockFut == nil || m.unlockFut.HasResult() {
m.unlockFut = NewFuture[any]()
return nil
}
if _, err := m.unlockFut.Await(ctx); err != nil {
return err
}
}
}
// Unlock unlocks the Mutex.
func (m *Mutex) Unlock() {
if m.unlockFut != nil {
m.unlockFut.SetResult(nil, nil)
}
}
// WaitMode modifies the behaviour of [Wait].
type WaitMode int
const (
WaitFirstResult WaitMode = iota // wait until any future has a result or an error
WaitFirstError // wait until any future has an error or until all futures have completed
WaitAll // wait until all futures have completed or errored
)
// Wait will wait for any or all of the given Futures to complete
// depending on the [WaitMode] passed.
// If any of the futures fail, the most recent error will be returned.
// Wait will not cancel any futures.
func Wait(ctx context.Context, mode WaitMode, futs ...Futurer) error {
var completed int
var lastErr error
waitFut := NewFuture[any]()
for _, fut := range futs {
fut.AddDoneCallback(func(err error) {
completed++
if err != nil {
lastErr = err
}
if completed >= len(futs) ||
mode == WaitFirstResult ||
(lastErr != nil && mode == WaitFirstError) {
waitFut.SetResult(nil, lastErr)
}
})
}
_, err := waitFut.Await(ctx)
return err
}
// GetFirstResult returns the result of the first successful coroutine.
// Once a coroutine succeeds, all unfinished tasks will be cancelled.
// If no coroutine succeeds, the last error is returned.
func GetFirstResult[T any](ctx context.Context, coros ...Coroutine2[T]) (T, error) {
taskCtx, cancel := context.WithCancel(ctx)
tasks := make([]*Task[T], 0, len(coros))
var done int
waitFut := NewFuture[T]()
waitFut.AddResultCallback(func(_ T, err error) {
// prevent new tasks from spawning
cancel()
// cancel any already started tasks
for _, t := range tasks {
t.Cancel(nil)
}
})
for i, coro := range coros {
tasks = append(tasks, SpawnTask(taskCtx, coro))
tasks[i].AddResultCallback(func(result T, err error) {
done++
if err == nil {
waitFut.SetResult(result, nil)
} else if done >= len(coros) {
waitFut.Cancel(err)
}
})
}
return waitFut.Await(ctx)
}
// Sleep suspends the current coroutine for the given duration.
func Sleep(ctx context.Context, duration time.Duration) error {
fut := NewFuture[any]()
handle := RunningLoop(ctx).ScheduleCallback(duration, func() {
fut.SetResult(nil, nil)
})
fut.AddDoneCallback(func(err error) {
handle.Cancel()
})
_, err := fut.Await(ctx)
return err
}
// Go launches the given function in a goroutine and returns a [Future]
// that will complete when the goroutine finishes.
func Go[T any](ctx context.Context, f func(ctx context.Context) (T, error)) *Future[T] {
loop := RunningLoop(ctx)
fut := NewFuture[T]()
goroCtx := context.WithValue(ctx, runningLoop{}, nil)
go func() {
result, err := f(goroCtx)
loop.RunCallbackThreadsafe(goroCtx, func() {
fut.SetResult(result, err)
})
}()
return fut
}