Skip to content

Commit 83b5a83

Browse files
brycekahlelmb
andcommitted
perf, ringbuf: add Flush for manual Read/ReadInto wakeup
Add a method Flush which interrupts a perf or ringbuf reader and causes it to read all data from the ring. This is very similar to the logic we use to check for data in the ring when a deadline is expired. The semantics of the Read function change slightly: a caller is now guaranteed to receive an os.ErrDeadlineExceeded which wasn't the case when the ring contained data. Signed-off-by: Bryce Kahle <[email protected]> Co-authored-by: Lorenz Bauer <[email protected]>
1 parent fc4f4c5 commit 83b5a83

File tree

7 files changed

+294
-60
lines changed

7 files changed

+294
-60
lines changed

internal/epoll/poller.go

+79-24
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package epoll
22

33
import (
4+
"errors"
45
"fmt"
56
"math"
67
"os"
78
"runtime"
9+
"slices"
810
"sync"
911
"time"
1012

1113
"github.com/cilium/ebpf/internal"
1214
"github.com/cilium/ebpf/internal/unix"
1315
)
1416

17+
var ErrFlushed = errors.New("data was flushed")
18+
1519
// Poller waits for readiness notifications from multiple file descriptors.
1620
//
1721
// The wait can be interrupted by calling Close.
@@ -21,27 +25,48 @@ type Poller struct {
2125
epollMu sync.Mutex
2226
epollFd int
2327

24-
eventMu sync.Mutex
25-
event *eventFd
28+
eventMu sync.Mutex
29+
closeEvent *eventFd
30+
flushEvent *eventFd
2631
}
2732

28-
func New() (*Poller, error) {
33+
func New() (_ *Poller, err error) {
34+
closeFDOnError := func(fd int) {
35+
if err != nil {
36+
unix.Close(fd)
37+
}
38+
}
39+
closeEventFDOnError := func(e *eventFd) {
40+
if err != nil {
41+
e.close()
42+
}
43+
}
44+
2945
epollFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
3046
if err != nil {
3147
return nil, fmt.Errorf("create epoll fd: %v", err)
3248
}
49+
defer closeFDOnError(epollFd)
3350

3451
p := &Poller{epollFd: epollFd}
35-
p.event, err = newEventFd()
52+
p.closeEvent, err = newEventFd()
53+
if err != nil {
54+
return nil, err
55+
}
56+
defer closeEventFDOnError(p.closeEvent)
57+
58+
p.flushEvent, err = newEventFd()
3659
if err != nil {
37-
unix.Close(epollFd)
3860
return nil, err
3961
}
62+
defer closeEventFDOnError(p.flushEvent)
63+
64+
if err := p.Add(p.closeEvent.raw, 0); err != nil {
65+
return nil, fmt.Errorf("add close eventfd: %w", err)
66+
}
4067

41-
if err := p.Add(p.event.raw, 0); err != nil {
42-
unix.Close(epollFd)
43-
p.event.close()
44-
return nil, fmt.Errorf("add eventfd: %w", err)
68+
if err := p.Add(p.flushEvent.raw, 0); err != nil {
69+
return nil, fmt.Errorf("add flush eventfd: %w", err)
4570
}
4671

4772
runtime.SetFinalizer(p, (*Poller).Close)
@@ -55,8 +80,8 @@ func New() (*Poller, error) {
5580
func (p *Poller) Close() error {
5681
runtime.SetFinalizer(p, nil)
5782

58-
// Interrupt Wait() via the event fd if it's currently blocked.
59-
if err := p.wakeWait(); err != nil {
83+
// Interrupt Wait() via the closeEvent fd if it's currently blocked.
84+
if err := p.wakeWaitForClose(); err != nil {
6085
return err
6186
}
6287

@@ -73,9 +98,14 @@ func (p *Poller) Close() error {
7398
p.epollFd = -1
7499
}
75100

76-
if p.event != nil {
77-
p.event.close()
78-
p.event = nil
101+
if p.closeEvent != nil {
102+
p.closeEvent.close()
103+
p.closeEvent = nil
104+
}
105+
106+
if p.flushEvent != nil {
107+
p.flushEvent.close()
108+
p.flushEvent = nil
79109
}
80110

81111
return nil
@@ -118,8 +148,11 @@ func (p *Poller) Add(fd int, id int) error {
118148

119149
// Wait for events.
120150
//
121-
// Returns the number of pending events or an error wrapping os.ErrClosed if
122-
// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout.
151+
// Returns the number of pending events and any errors.
152+
//
153+
// - [os.ErrClosed] if interrupted by [Close].
154+
// - [ErrFlushed] if interrupted by [Flush].
155+
// - [os.ErrDeadlineExceeded] if deadline is reached.
123156
func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) {
124157
p.epollMu.Lock()
125158
defer p.epollMu.Unlock()
@@ -154,33 +187,55 @@ func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error)
154187
return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded)
155188
}
156189

157-
for _, event := range events[:n] {
158-
if int(event.Fd) == p.event.raw {
159-
// Since we don't read p.event the event is never cleared and
190+
for i := 0; i < n; {
191+
event := events[i]
192+
if int(event.Fd) == p.closeEvent.raw {
193+
// Since we don't read p.closeEvent the event is never cleared and
160194
// we'll keep getting this wakeup until Close() acquires the
161195
// lock and sets p.epollFd = -1.
162196
return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed)
163197
}
198+
if int(event.Fd) == p.flushEvent.raw {
199+
// read event to prevent it from continuing to wake
200+
p.flushEvent.read()
201+
err = ErrFlushed
202+
events = slices.Delete(events, i, i+1)
203+
n -= 1
204+
continue
205+
}
206+
i++
164207
}
165208

166-
return n, nil
209+
return n, err
167210
}
168211
}
169212

170213
type temporaryError interface {
171214
Temporary() bool
172215
}
173216

174-
// wakeWait unblocks Wait if it's epoll_wait.
175-
func (p *Poller) wakeWait() error {
217+
// wakeWaitForClose unblocks Wait if it's epoll_wait.
218+
func (p *Poller) wakeWaitForClose() error {
219+
p.eventMu.Lock()
220+
defer p.eventMu.Unlock()
221+
222+
if p.closeEvent == nil {
223+
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
224+
}
225+
226+
return p.closeEvent.add(1)
227+
}
228+
229+
// Flush unblocks Wait if it's epoll_wait, for purposes of reading pending samples
230+
func (p *Poller) Flush() error {
176231
p.eventMu.Lock()
177232
defer p.eventMu.Unlock()
178233

179-
if p.event == nil {
234+
if p.flushEvent == nil {
180235
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
181236
}
182237

183-
return p.event.add(1)
238+
return p.flushEvent.add(1)
184239
}
185240

186241
// eventFd wraps a Linux eventfd.

internal/epoll/poller_test.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/cilium/ebpf/internal/unix"
11+
"github.com/go-quicktest/qt"
1112
)
1213

1314
func TestPoller(t *testing.T) {
@@ -101,12 +102,33 @@ func TestPollerDeadline(t *testing.T) {
101102
}()
102103

103104
// Wait for the goroutine to enter the syscall.
104-
time.Sleep(time.Second)
105+
time.Sleep(500 * time.Microsecond)
105106

106107
poller.Close()
107108
<-done
108109
}
109110

111+
func TestPollerFlush(t *testing.T) {
112+
t.Parallel()
113+
114+
_, poller := mustNewPoller(t)
115+
events := make([]unix.EpollEvent, 1)
116+
117+
done := make(chan struct{})
118+
go func() {
119+
defer close(done)
120+
121+
_, err := poller.Wait(events, time.Time{})
122+
qt.Check(t, qt.ErrorIs(err, ErrFlushed))
123+
}()
124+
125+
// Wait for the goroutine to enter the syscall.
126+
time.Sleep(500 * time.Microsecond)
127+
128+
poller.Flush()
129+
<-done
130+
}
131+
110132
func mustNewPoller(t *testing.T) (*eventFd, *Poller) {
111133
t.Helper()
112134

perf/reader.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ import (
1818
)
1919

2020
var (
21-
ErrClosed = os.ErrClosed
22-
errEOR = errors.New("end of ring")
21+
ErrClosed = os.ErrClosed
22+
ErrFlushed = epoll.ErrFlushed
23+
errEOR = errors.New("end of ring")
2324
)
2425

2526
var perfEventHeaderSize = binary.Size(perfEventHeader{})
@@ -160,6 +161,8 @@ type Reader struct {
160161
overwritable bool
161162

162163
bufferSize int
164+
165+
pendingErr error
163166
}
164167

165168
// ReaderOptions control the behaviour of the user
@@ -318,18 +321,18 @@ func (pr *Reader) SetDeadline(t time.Time) {
318321

319322
// Read the next record from the perf ring buffer.
320323
//
321-
// The function blocks until there are at least Watermark bytes in one
324+
// The method blocks until there are at least Watermark bytes in one
322325
// of the per CPU buffers. Records from buffers below the Watermark
323326
// are not returned.
324327
//
325328
// Records can contain between 0 and 7 bytes of trailing garbage from the ring
326329
// depending on the input sample's length.
327330
//
328-
// Calling Close interrupts the function.
331+
// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush]
332+
// makes it return all records currently in the ring buffer, followed by [ErrFlushed].
329333
//
330-
// Returns [os.ErrDeadlineExceeded] if a deadline was set and the perf ring buffer
331-
// was empty. Otherwise returns a record and no error, even if the deadline was
332-
// exceeded.
334+
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
335+
// have been read from the ring.
333336
//
334337
// See [Reader.ReadInto] for a more efficient version of this method.
335338
func (pr *Reader) Read() (Record, error) {
@@ -356,13 +359,13 @@ func (pr *Reader) ReadInto(rec *Record) error {
356359
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
357360
}
358361

359-
deadlineWasExceeded := false
360362
for {
361363
if len(pr.epollRings) == 0 {
362-
if deadlineWasExceeded {
363-
// All rings were empty when the deadline expired, return
364+
if pe := pr.pendingErr; pe != nil {
365+
// All rings have been emptied since the error occurred, return
364366
// appropriate error.
365-
return os.ErrDeadlineExceeded
367+
pr.pendingErr = nil
368+
return pe
366369
}
367370

368371
// NB: The deferred pauseMu.Unlock will panic if Wait panics, which
@@ -371,10 +374,10 @@ func (pr *Reader) ReadInto(rec *Record) error {
371374
_, err := pr.poller.Wait(pr.epollEvents, pr.deadline)
372375
pr.pauseMu.Lock()
373376

374-
if errors.Is(err, os.ErrDeadlineExceeded) {
377+
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
375378
// We've hit the deadline, check whether there is any data in
376379
// the rings that we've not been woken up for.
377-
deadlineWasExceeded = true
380+
pr.pendingErr = err
378381
} else if err != nil {
379382
return err
380383
}
@@ -463,6 +466,12 @@ func (pr *Reader) BufferSize() int {
463466
return pr.bufferSize
464467
}
465468

469+
// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
470+
// until you receive a [ErrFlushed] error.
471+
func (pr *Reader) Flush() error {
472+
return pr.poller.Flush()
473+
}
474+
466475
// NB: Has to be preceded by a call to ring.loadHead.
467476
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
468477
defer ring.writeTail()

perf/reader_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,77 @@ func TestReaderSetDeadline(t *testing.T) {
6868
if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) {
6969
t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err)
7070
}
71+
72+
rd.SetDeadline(time.Now().Add(10 * time.Millisecond))
73+
if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) {
74+
t.Error("Expected os.ErrDeadlineExceeded from third Read, got:", err)
75+
}
76+
}
77+
78+
func TestReaderSetDeadlinePendingEvents(t *testing.T) {
79+
events := perfEventArray(t)
80+
81+
rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2})
82+
if err != nil {
83+
t.Fatal(err)
84+
}
85+
defer rd.Close()
86+
87+
outputSamples(t, events, 5)
88+
89+
rd.SetDeadline(time.Now().Add(-time.Second))
90+
_, rem := checkRecord(t, rd)
91+
qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining"))
92+
93+
outputSamples(t, events, 5)
94+
95+
// another sample should not be returned before we get ErrFlushed to indicate initial set of samples read
96+
_, err = rd.Read()
97+
if !errors.Is(err, os.ErrDeadlineExceeded) {
98+
t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err)
99+
}
100+
101+
// the second sample should now be read
102+
_, _ = checkRecord(t, rd)
103+
}
104+
105+
func TestReaderFlushPendingEvents(t *testing.T) {
106+
testutils.LockOSThreadToSingleCPU(t)
107+
events := perfEventArray(t)
108+
109+
rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2})
110+
if err != nil {
111+
t.Fatal(err)
112+
}
113+
defer rd.Close()
114+
115+
outputSamples(t, events, 5)
116+
117+
wait := make(chan int)
118+
go func() {
119+
wait <- 0
120+
_, rem := checkRecord(t, rd)
121+
wait <- rem
122+
}()
123+
124+
<-wait
125+
time.Sleep(10 * time.Millisecond)
126+
err = rd.Flush()
127+
qt.Assert(t, qt.IsNil(err))
128+
129+
rem := <-wait
130+
qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining"))
131+
132+
outputSamples(t, events, 5)
133+
134+
// another sample should not be returned before we get ErrFlushed to indicate initial set of samples read
135+
_, err = rd.Read()
136+
if !errors.Is(err, ErrFlushed) {
137+
t.Error("Expected ErrFlushed from second Read, got:", err)
138+
}
139+
140+
// the second sample should now be read
141+
_, _ = checkRecord(t, rd)
71142
}
72143

73144
func outputSamples(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) {

0 commit comments

Comments
 (0)