From 8593219ce8a0c8a496566e0c63411f6f5bfdc83d Mon Sep 17 00:00:00 2001 From: Nia Waldvogel Date: Tue, 28 Dec 2021 17:20:43 -0500 Subject: [PATCH 1/3] internal/task: use a lock-free queue This changes task.Stack and task.Queue to use atomic compare-and-swap operations. With this change, they can also be executed safely from multiple threads/cores. Additionally, the queue is actually LIFO now (not sure what I was thinking before). --- src/internal/task/queue.go | 139 +++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 76 deletions(-) diff --git a/src/internal/task/queue.go b/src/internal/task/queue.go index 5a00d95a6f..0c6041d3e1 100644 --- a/src/internal/task/queue.go +++ b/src/internal/task/queue.go @@ -1,69 +1,63 @@ package task -import "runtime/interrupt" +import ( + "sync/atomic" + "unsafe" +) const asserts = false // Queue is a FIFO container of tasks. // The zero value is an empty queue. type Queue struct { - head, tail *Task + // in is a stack used to buffer incoming tasks. + in Stack + + // out is a singly linked list of tasks in oldest-first order. + // Once empty, it is refilled by dumping and flipping the input stack. + out *Task } // Push a task onto the queue. +// This is atomic. func (q *Queue) Push(t *Task) { - i := interrupt.Disable() - if asserts && t.Next != nil { - interrupt.Restore(i) - panic("runtime: pushing a task to a queue with a non-nil Next pointer") - } - if q.tail != nil { - q.tail.Next = t - } - q.tail = t - t.Next = nil - if q.head == nil { - q.head = t - } - interrupt.Restore(i) + q.in.Push(t) } // Pop a task off of the queue. +// This cannot be called concurrently. func (q *Queue) Pop() *Task { - i := interrupt.Disable() - t := q.head - if t == nil { - interrupt.Restore(i) - return nil - } - q.head = t.Next - if q.tail == t { - q.tail = nil - } - t.Next = nil - interrupt.Restore(i) - return t -} + next := q.out + if next == nil { + // Dump the input stack. + s := q.in.dump() + + // Flip it. + var prev *Task + for t := s.top; t != nil; { + next := t.Next + t.Next = prev + prev = t + t = next + } + if prev == nil { + // The queue is empty. + return nil + } -// Append pops the contents of another queue and pushes them onto the end of this queue. -func (q *Queue) Append(other *Queue) { - i := interrupt.Disable() - if q.head == nil { - q.head = other.head - } else { - q.tail.Next = other.head + // Save it in the output list. + next = prev } - q.tail = other.tail - other.head, other.tail = nil, nil - interrupt.Restore(i) + + q.out = next.Next + next.Next = nil + return next } // Empty checks if the queue is empty. +// This cannot be called concurrently with Pop. func (q *Queue) Empty() bool { - i := interrupt.Disable() - empty := q.head == nil - interrupt.Restore(i) - return empty + return q.out == nil && atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.in.top))) == nil } // Stack is a LIFO container of tasks. @@ -74,51 +68,44 @@ type Stack struct { } // Push a task onto the stack. +// This is atomic. func (s *Stack) Push(t *Task) { - i := interrupt.Disable() if asserts && t.Next != nil { - interrupt.Restore(i) panic("runtime: pushing a task to a stack with a non-nil Next pointer") } - s.top, t.Next = t, s.top - interrupt.Restore(i) + topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top)) +doPush: + top := atomic.LoadPointer(topPtr) + t.Next = (*Task)(top) + if !atomic.CompareAndSwapPointer(topPtr, top, unsafe.Pointer(t)) { + goto doPush + } } // Pop a task off of the stack. +// This is atomic. func (s *Stack) Pop() *Task { - i := interrupt.Disable() - t := s.top - if t != nil { - s.top = t.Next - t.Next = nil - } - interrupt.Restore(i) - return t -} - -// tail follows the chain of tasks. -// If t is nil, returns nil. -// Otherwise, returns the task in the chain where the Next field is nil. -func (t *Task) tail() *Task { - if t == nil { + topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top)) +doPop: + top := atomic.LoadPointer(topPtr) + if top == nil { return nil } - for t.Next != nil { - t = t.Next + t := (*Task)(top) + next := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.Next))) + if !atomic.CompareAndSwapPointer(topPtr, top, next) { + goto doPop } + t.Next = nil return t } -// Queue moves the contents of the stack into a queue. -// Elements can be popped from the queue in the same order that they would be popped from the stack. -func (s *Stack) Queue() Queue { - i := interrupt.Disable() - head := s.top - s.top = nil - q := Queue{ - head: head, - tail: head.tail(), +// dump the contents of the stack to another stack. +func (s *Stack) dump() Stack { + return Stack{ + top: (*Task)(atomic.SwapPointer( + (*unsafe.Pointer)(unsafe.Pointer(&s.top)), + nil, + )), } - interrupt.Restore(i) - return q } From 1a43b7276981a09dad1f2919efd05b5a52ab6d62 Mon Sep 17 00:00:00 2001 From: Joseph Lee Date: Mon, 18 Dec 2023 11:08:23 +0900 Subject: [PATCH 2/3] feat: threads scheduler --- builder/musl.go | 1 + compileopts/config.go | 3 + compileopts/options.go | 2 +- main.go | 2 +- src/internal/task/task_handover.go | 7 + src/internal/task/task_stack_amd64.S | 26 + src/internal/task/task_threads.go | 101 ++++ src/internal/task/task_threads_linux.go | 26 + src/runtime/chan.go | 3 + src/runtime/chan_multicore.go | 668 ++++++++++++++++++++++++ src/runtime/nestedspinlock.go | 35 ++ src/runtime/runtime_unix.go | 7 + src/runtime/scheduler_threads.go | 11 + src/runtime/spinlock/spinlock.go | 26 + src/runtime/spinlock/spinlock_test.go | 47 ++ src/runtime/wait_other.go | 2 +- 16 files changed, 964 insertions(+), 3 deletions(-) create mode 100644 src/internal/task/task_handover.go create mode 100644 src/internal/task/task_threads.go create mode 100644 src/internal/task/task_threads_linux.go create mode 100644 src/runtime/chan_multicore.go create mode 100644 src/runtime/nestedspinlock.go create mode 100644 src/runtime/scheduler_threads.go create mode 100644 src/runtime/spinlock/spinlock.go create mode 100644 src/runtime/spinlock/spinlock_test.go diff --git a/builder/musl.go b/builder/musl.go index 6ae1fda065..ad70f1662b 100644 --- a/builder/musl.go +++ b/builder/musl.go @@ -130,6 +130,7 @@ var Musl = Library{ "thread/*.c", "time/*.c", "unistd/*.c", + "linux/gettid.c", } if arch == "arm" { // These files need to be added to the start for some reason. diff --git a/compileopts/config.go b/compileopts/config.go index 2260de2d2c..bae62d2059 100644 --- a/compileopts/config.go +++ b/compileopts/config.go @@ -75,6 +75,9 @@ func (c *Config) GOARM() string { // BuildTags returns the complete list of build tags used during this build. func (c *Config) BuildTags() []string { tags := append(c.Target.BuildTags, []string{"tinygo", "math_big_pure_go", "gc." + c.GC(), "scheduler." + c.Scheduler(), "serial." + c.Serial()}...) + if c.Scheduler() == "threads" { + tags = append(tags, "multicore") + } for i := 1; i <= c.GoMinorVersion; i++ { tags = append(tags, fmt.Sprintf("go1.%d", i)) } diff --git a/compileopts/options.go b/compileopts/options.go index 4440f4cf62..e6bfd46a0e 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -9,7 +9,7 @@ import ( var ( validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb"} validPrintSizeOptions = []string{"none", "short", "full"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/main.go b/main.go index f9eb4eff47..8301b60a53 100644 --- a/main.go +++ b/main.go @@ -1409,7 +1409,7 @@ func main() { opt := flag.String("opt", "z", "optimization level: 0, 1, 2, s, z") gc := flag.String("gc", "", "garbage collector to use (none, leaking, conservative)") panicStrategy := flag.String("panic", "print", "panic strategy (print, trap)") - scheduler := flag.String("scheduler", "", "which scheduler to use (none, tasks, asyncify)") + scheduler := flag.String("scheduler", "", "which scheduler to use (none, tasks, asyncify, threads)") serial := flag.String("serial", "", "which serial output to use (none, uart, usb)") work := flag.Bool("work", false, "print the name of the temporary build directory and do not delete this directory on exit") interpTimeout := flag.Duration("interp-timeout", 180*time.Second, "interp optimization pass timeout") diff --git a/src/internal/task/task_handover.go b/src/internal/task/task_handover.go new file mode 100644 index 0000000000..b66136aabd --- /dev/null +++ b/src/internal/task/task_handover.go @@ -0,0 +1,7 @@ +package task + +//export tinygo_handoverTask +func handoverTask(fn uintptr, args uintptr) + +//go:extern tinygo_callMachineEntry +var callMachineEntryFn [0]uint8 diff --git a/src/internal/task/task_stack_amd64.S b/src/internal/task/task_stack_amd64.S index f9182d49ff..94a627125a 100644 --- a/src/internal/task/task_stack_amd64.S +++ b/src/internal/task/task_stack_amd64.S @@ -73,6 +73,32 @@ tinygo_swapTask: // Return into the new task, as if tinygo_swapTask was a regular call. ret +#ifdef __MACH__ // Darwin +.global _tinygo_handoverTask +_tinygo_handoverTask: +#else // Linux etc +.section .text.tinygo_handoverTask +.global tinygo_handoverTask +tinygo_handoverTask: +#endif + .cfi_startproc + movq %rdi, %rax + movq %rsi, %rdi + jmp *%rax + .cfi_endproc + +#ifdef __MACH__ // Darwin +.global _tinygo_callMachineEntry +_tinygo_callMachineEntry: +#else // Linux etc +.section .text.tinygo_callMachineEntry +.global tinygo_callMachineEntry +tinygo_callMachineEntry: +#endif + .cfi_startproc + jmp tinygo_machineEntry + .cfi_endproc + #ifdef __MACH__ // Darwin // allow these symbols to stripped as dead code .subsections_via_symbols diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..c58a03b7da --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,101 @@ +//go:build scheduler.threads + +package task + +import ( + "runtime/spinlock" + "sync/atomic" + "unsafe" +) + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(str string) + +//go:linkname currentMachineId runtime.currentMachineId +func currentMachineId() int + +type state struct { + tid int + + paused int32 +} + +var lock spinlock.SpinLock +var tasks map[int]*Task = map[int]*Task{} + +// Current returns the current active task. +func Current() *Task { + tid := currentMachineId() + lock.Lock() + task := tasks[tid] + lock.Unlock() + return task +} + +// Pause suspends the current task and returns to the scheduler. +// This function may only be called when running on a goroutine stack, not when running on the system stack or in an interrupt. +func Pause() { + task := Current() + atomic.StoreInt32(&task.state.paused, 1) + for atomic.LoadInt32(&task.state.paused) == 1 { + // wait for notify + } +} + +// Resume the task until it pauses or completes. +// This may only be called from the scheduler. +func (t *Task) Resume() { + atomic.StoreInt32(&t.state.paused, 0) +} + +// OnSystemStack returns whether the caller is running on the system stack. +func OnSystemStack() bool { + // If there is not an active goroutine, then this must be running on the system stack. + return Current() == nil +} + +func SystemStack() uintptr { + return 0 +} + +type taskHandoverParam struct { + t *Task + fn uintptr + args uintptr +} + +var retainObjects = map[unsafe.Pointer]any{} + +// start creates and starts a new goroutine with the given function and arguments. +// The new goroutine is scheduled to run later. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + h := &taskHandoverParam{ + t: t, + fn: fn, + args: uintptr(args), + } + + lock.Lock() + retainObjects[unsafe.Pointer(h)] = h + lock.Unlock() + + createThread(uintptr(unsafe.Pointer(&callMachineEntryFn)), uintptr(unsafe.Pointer(h)), stackSize) +} + +//export tinygo_machineEntry +func machineEntry(param *taskHandoverParam) { + tid := currentMachineId() + param.t.state.tid = tid + + lock.Lock() + delete(retainObjects, unsafe.Pointer(param)) + tasks[tid] = param.t + lock.Unlock() + + handoverTask(param.fn, param.args) + + lock.Lock() + delete(tasks, tid) + lock.Unlock() +} diff --git a/src/internal/task/task_threads_linux.go b/src/internal/task/task_threads_linux.go new file mode 100644 index 0000000000..61766c6173 --- /dev/null +++ b/src/internal/task/task_threads_linux.go @@ -0,0 +1,26 @@ +package task + +import ( + "strconv" + "unsafe" +) + +//go:extern errno +var libcErrno uintptr + +//export pthread_create +func pthread_create(pthread uintptr, arg2 uintptr, fn uintptr, param uintptr) int + +type Errno uintptr + +func (e Errno) Error() string { + return "system error: " + strconv.Itoa(int(e)) +} + +func createThread(fn uintptr, param uintptr, stackSize uintptr) (uintptr, error) { + var pthread uintptr + if pthread_create(uintptr(unsafe.Pointer(&pthread)), 0, fn, param) < 0 { + return pthread, Errno(libcErrno) + } + return pthread, nil +} diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 6253722467..4063dea240 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -1,3 +1,6 @@ +//go:build !multicore +// +build !multicore + package runtime // This file implements the 'chan' type and send/receive/select operations. diff --git a/src/runtime/chan_multicore.go b/src/runtime/chan_multicore.go new file mode 100644 index 0000000000..734c34a322 --- /dev/null +++ b/src/runtime/chan_multicore.go @@ -0,0 +1,668 @@ +//go:build multicore +// +build multicore + +package runtime + +// This file implements the 'chan' type and send/receive/select operations. + +// A channel can be in one of the following states: +// empty: +// No goroutine is waiting on a send or receive operation. The 'blocked' +// member is nil. +// recv: +// A goroutine tries to receive from the channel. This goroutine is stored +// in the 'blocked' member. +// send: +// The reverse of send. A goroutine tries to send to the channel. This +// goroutine is stored in the 'blocked' member. +// closed: +// The channel is closed. Sends will panic, receives will get a zero value +// plus optionally the indication that the channel is zero (with the +// comma-ok value in the task). +// +// A send/recv transmission is completed by copying from the data element of the +// sending task to the data element of the receiving task, and setting +// the 'comma-ok' value to true. +// A receive operation on a closed channel is completed by zeroing the data +// element of the receiving task and setting the 'comma-ok' value to false. + +import ( + "internal/task" + "runtime/interrupt" + "unsafe" +) + +var chanLock NestedSpinLock + +func chanDebug(ch *channel) { + if schedulerDebug { + if ch.bufSize > 0 { + println("--- channel update:", ch, ch.state.String(), ch.bufSize, ch.bufUsed) + } else { + println("--- channel update:", ch, ch.state.String()) + } + } +} + +// channelBlockedList is a list of channel operations on a specific channel which are currently blocked. +type channelBlockedList struct { + // next is a pointer to the next blocked channel operation on the same channel. + next *channelBlockedList + + // t is the task associated with this channel operation. + // If this channel operation is not part of a select, then the pointer field of the state holds the data buffer. + // If this channel operation is part of a select, then the pointer field of the state holds the receive buffer. + // If this channel operation is a receive, then the data field should be set to zero when resuming due to channel closure. + t *task.Task + + // s is a pointer to the channel select state corresponding to this operation. + // This will be nil if and only if this channel operation is not part of a select statement. + // If this is a send operation, then the send buffer can be found in this select state. + s *chanSelectState + + // allSelectOps is a slice containing all of the channel operations involved with this select statement. + // Before resuming the task, all other channel operations on this select statement should be canceled by removing them from their corresponding lists. + allSelectOps []channelBlockedList +} + +// remove takes the current list of blocked channel operations and removes the specified operation. +// This returns the resulting list, or nil if the resulting list is empty. +// A nil receiver is treated as an empty list. +func (b *channelBlockedList) remove(old *channelBlockedList) *channelBlockedList { + if b == old { + return b.next + } + c := b + for ; c != nil && c.next != old; c = c.next { + } + if c != nil { + c.next = old.next + } + return b +} + +// detatch removes all other channel operations that are part of the same select statement. +// If the input is not part of a select statement, this is a no-op. +// This must be called before resuming any task blocked on a channel operation in order to ensure that it is not placed on the runqueue twice. +func (b *channelBlockedList) detach() { + if b.allSelectOps == nil { + // nothing to do + return + } + for i, v := range b.allSelectOps { + // cancel all other channel operations that are part of this select statement + switch { + case &b.allSelectOps[i] == b: + // This entry is the one that was already detatched. + continue + case v.t == nil: + // This entry is not used (nil channel). + continue + } + v.s.ch.blocked = v.s.ch.blocked.remove(&b.allSelectOps[i]) + if v.s.ch.blocked == nil { + if v.s.value == nil { + // recv operation + if v.s.ch.state != chanStateClosed { + v.s.ch.state = chanStateEmpty + } + } else { + // send operation + if v.s.ch.bufUsed == 0 { + // unbuffered channel + v.s.ch.state = chanStateEmpty + } else { + // buffered channel + v.s.ch.state = chanStateBuf + } + } + } + chanDebug(v.s.ch) + } +} + +type channel struct { + elementSize uintptr // the size of one value in this channel + bufSize uintptr // size of buffer (in elements) + state chanState + blocked *channelBlockedList + bufHead uintptr // head index of buffer (next push index) + bufTail uintptr // tail index of buffer (next pop index) + bufUsed uintptr // number of elements currently in buffer + buf unsafe.Pointer // pointer to first element of buffer +} + +// chanMake creates a new channel with the given element size and buffer length in number of elements. +// This is a compiler intrinsic. +func chanMake(elementSize uintptr, bufSize uintptr) *channel { + return &channel{ + elementSize: elementSize, + bufSize: bufSize, + buf: alloc(elementSize*bufSize, nil), + } +} + +// Return the number of entries in this chan, called from the len builtin. +// A nil chan is defined as having length 0. +// +//go:inline +func chanLen(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufUsed) +} + +// wrapper for use in reflect +func chanLenUnsafePointer(p unsafe.Pointer) int { + c := (*channel)(p) + return chanLen(c) +} + +// Return the capacity of this chan, called from the cap builtin. +// A nil chan is defined as having capacity 0. +// +//go:inline +func chanCap(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufSize) +} + +// wrapper for use in reflect +func chanCapUnsafePointer(p unsafe.Pointer) int { + c := (*channel)(p) + return chanCap(c) +} + +// resumeRX resumes the next receiver and returns the destination pointer. +// If the ok value is true, then the caller is expected to store a value into this pointer. +func (ch *channel) resumeRX(ok bool) unsafe.Pointer { + // pop a blocked goroutine off the stack + var b *channelBlockedList + b, ch.blocked = ch.blocked, ch.blocked.next + + // get destination pointer + dst := b.t.Ptr + + if !ok { + // the result value is zero + memzero(dst, ch.elementSize) + b.t.Data = 0 + } + + if b.s != nil { + // tell the select op which case resumed + b.t.Ptr = unsafe.Pointer(b.s) + + // detach associated operations + b.detach() + } + + // push task onto runqueue + runqueue.Push(b.t) + + return dst +} + +// resumeTX resumes the next sender and returns the source pointer. +// The caller is expected to read from the value in this pointer before yielding. +func (ch *channel) resumeTX() unsafe.Pointer { + // pop a blocked goroutine off the stack + var b *channelBlockedList + b, ch.blocked = ch.blocked, ch.blocked.next + + // get source pointer + src := b.t.Ptr + + if b.s != nil { + // use state's source pointer + src = b.s.value + + // tell the select op which case resumed + b.t.Ptr = unsafe.Pointer(b.s) + + // detach associated operations + b.detach() + } + + // push task onto runqueue + runqueue.Push(b.t) + + return src +} + +// push value to end of channel if space is available +// returns whether there was space for the value in the buffer +func (ch *channel) push(value unsafe.Pointer) bool { + // immediately return false if the channel is not buffered + if ch.bufSize == 0 { + return false + } + + // ensure space is available + if ch.bufUsed == ch.bufSize { + return false + } + + // copy value to buffer + memcpy( + unsafe.Add(ch.buf, // pointer to the base of the buffer + offset = pointer to destination element + ch.elementSize*ch.bufHead), // element size * equivalent slice index = offset + value, + ch.elementSize, + ) + + // update buffer state + ch.bufUsed++ + ch.bufHead++ + if ch.bufHead == ch.bufSize { + ch.bufHead = 0 + } + + return true +} + +// pop value from channel buffer if one is available +// returns whether a value was popped or not +// result is stored into value pointer +func (ch *channel) pop(value unsafe.Pointer) bool { + // channel is empty + if ch.bufUsed == 0 { + return false + } + + // compute address of source + addr := unsafe.Add(ch.buf, (ch.elementSize * ch.bufTail)) + + // copy value from buffer + memcpy( + value, + addr, + ch.elementSize, + ) + + // zero buffer element to allow garbage collection of value + memzero( + addr, + ch.elementSize, + ) + + // update buffer state + ch.bufUsed-- + + // move tail up + ch.bufTail++ + if ch.bufTail == ch.bufSize { + ch.bufTail = 0 + } + + return true +} + +// try to send a value to a channel, without actually blocking +// returns whether the value was sent +// will panic if channel is closed +func (ch *channel) trySend(value unsafe.Pointer) bool { + if ch == nil { + // send to nil channel blocks forever + // this is non-blocking, so just say no + return false + } + + chanLock.Lock() + + switch ch.state { + case chanStateEmpty, chanStateBuf: + // try to dump the value directly into the buffer + if ch.push(value) { + ch.state = chanStateBuf + chanLock.Unlock() + return true + } + chanLock.Unlock() + return false + case chanStateRecv: + // unblock receiver + dst := ch.resumeRX(true) + + // copy value to receiver + memcpy(dst, value, ch.elementSize) + + // change state to empty if there are no more receivers + if ch.blocked == nil { + ch.state = chanStateEmpty + } + + chanLock.Unlock() + return true + case chanStateSend: + // something else is already waiting to send + chanLock.Unlock() + return false + case chanStateClosed: + chanLock.Unlock() + runtimePanic("send on closed channel") + default: + chanLock.Unlock() + runtimePanic("invalid channel state") + } + + chanLock.Unlock() + return false +} + +// try to receive a value from a channel, without really blocking +// returns whether a value was received +// second return is the comma-ok value +func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { + if ch == nil { + // receive from nil channel blocks forever + // this is non-blocking, so just say no + return false, false + } + + chanLock.Lock() + + switch ch.state { + case chanStateBuf, chanStateSend: + // try to pop the value directly from the buffer + if ch.pop(value) { + // unblock next sender if applicable + if ch.blocked != nil { + src := ch.resumeTX() + + // push sender's value into buffer + ch.push(src) + + if ch.blocked == nil { + // last sender unblocked - update state + ch.state = chanStateBuf + } + } + + if ch.bufUsed == 0 { + // channel empty - update state + ch.state = chanStateEmpty + } + + chanLock.Unlock() + return true, true + } else if ch.blocked != nil { + // unblock next sender if applicable + src := ch.resumeTX() + + // copy sender's value + memcpy(value, src, ch.elementSize) + + if ch.blocked == nil { + // last sender unblocked - update state + ch.state = chanStateEmpty + } + + chanLock.Unlock() + return true, true + } + chanLock.Unlock() + return false, false + case chanStateRecv, chanStateEmpty: + // something else is already waiting to receive + chanLock.Unlock() + return false, false + case chanStateClosed: + if ch.pop(value) { + chanLock.Unlock() + return true, true + } + + // channel closed - nothing to receive + memzero(value, ch.elementSize) + chanLock.Unlock() + return true, false + default: + runtimePanic("invalid channel state") + } + + runtimePanic("unreachable") + return false, false +} + +type chanState uint8 + +const ( + chanStateEmpty chanState = iota // nothing in channel, no senders/receivers + chanStateRecv // nothing in channel, receivers waiting + chanStateSend // senders waiting, buffer full if present + chanStateBuf // buffer not empty, no senders waiting + chanStateClosed // channel closed +) + +func (s chanState) String() string { + switch s { + case chanStateEmpty: + return "empty" + case chanStateRecv: + return "recv" + case chanStateSend: + return "send" + case chanStateBuf: + return "buffered" + case chanStateClosed: + return "closed" + default: + return "invalid" + } +} + +// chanSelectState is a single channel operation (send/recv) in a select +// statement. The value pointer is either nil (for receives) or points to the +// value to send (for sends). +type chanSelectState struct { + ch *channel + value unsafe.Pointer +} + +// chanSend sends a single value over the channel. +// This operation will block unless a value is immediately available. +// May panic if the channel is closed. +func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { + chanLock.Lock() + + if ch.trySend(value) { + // value immediately sent + chanDebug(ch) + chanLock.Unlock() + return + } + + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + chanLock.Unlock() + deadlock() + } + + // wait for receiver + sender := task.Current() + ch.state = chanStateSend + sender.Ptr = value + *blockedlist = channelBlockedList{ + next: ch.blocked, + t: sender, + } + ch.blocked = blockedlist + chanDebug(ch) + chanLock.Unlock() + task.Pause() + sender.Ptr = nil +} + +// chanRecv receives a single value over a channel. +// It blocks if there is no available value to receive. +// The received value is copied into the value pointer. +// Returns the comma-ok value. +func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { + chanLock.Lock() + + if rx, ok := ch.tryRecv(value); rx { + // value immediately available + chanDebug(ch) + chanLock.Unlock() + return ok + } + + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + chanLock.Unlock() + deadlock() + } + + // wait for a value + receiver := task.Current() + ch.state = chanStateRecv + receiver.Ptr, receiver.Data = value, 1 + *blockedlist = channelBlockedList{ + next: ch.blocked, + t: receiver, + } + ch.blocked = blockedlist + chanDebug(ch) + chanLock.Unlock() + task.Pause() + ok := receiver.Data == 1 + receiver.Ptr, receiver.Data = nil, 0 + return ok +} + +// chanClose closes the given channel. If this channel has a receiver or is +// empty, it closes the channel. Else, it panics. +func chanClose(ch *channel) { + if ch == nil { + // Not allowed by the language spec. + runtimePanic("close of nil channel") + } + chanLock.Lock() + switch ch.state { + case chanStateClosed: + // Not allowed by the language spec. + chanLock.Unlock() + runtimePanic("close of closed channel") + case chanStateSend: + // This panic should ideally on the sending side, not in this goroutine. + // But when a goroutine tries to send while the channel is being closed, + // that is clearly invalid: the send should have been completed already + // before the close. + chanLock.Unlock() + runtimePanic("close channel during send") + case chanStateRecv: + // unblock all receivers with the zero value + ch.state = chanStateClosed + for ch.blocked != nil { + ch.resumeRX(false) + } + case chanStateEmpty, chanStateBuf: + // Easy case. No available sender or receiver. + } + ch.state = chanStateClosed + chanLock.Unlock() + chanDebug(ch) +} + +// chanSelect is the runtime implementation of the select statement. This is +// perhaps the most complicated statement in the Go spec. It returns the +// selected index and the 'comma-ok' value. +// +// TODO: do this in a round-robin fashion (as specified in the Go spec) instead +// of picking the first one that can proceed. +func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + istate := interrupt.Disable() + + if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) { + // one channel was immediately ready + interrupt.Restore(istate) + return selected, ok + } + + // construct blocked operations + for i, v := range states { + if v.ch == nil { + // A nil channel receive will never complete. + // A nil channel send would have panicked during tryChanSelect. + ops[i] = channelBlockedList{} + continue + } + + ops[i] = channelBlockedList{ + next: v.ch.blocked, + t: task.Current(), + s: &states[i], + allSelectOps: ops, + } + v.ch.blocked = &ops[i] + if v.value == nil { + // recv + switch v.ch.state { + case chanStateEmpty: + v.ch.state = chanStateRecv + case chanStateRecv: + // already in correct state + default: + interrupt.Restore(istate) + runtimePanic("invalid channel state") + } + } else { + // send + switch v.ch.state { + case chanStateEmpty: + v.ch.state = chanStateSend + case chanStateSend: + // already in correct state + case chanStateBuf: + // already in correct state + default: + interrupt.Restore(istate) + runtimePanic("invalid channel state") + } + } + chanDebug(v.ch) + } + + // expose rx buffer + t := task.Current() + t.Ptr = recvbuf + t.Data = 1 + + // wait for one case to fire + interrupt.Restore(istate) + task.Pause() + + // figure out which one fired and return the ok value + return (uintptr(t.Ptr) - uintptr(unsafe.Pointer(&states[0]))) / unsafe.Sizeof(chanSelectState{}), t.Data != 0 +} + +// tryChanSelect is like chanSelect, but it does a non-blocking select operation. +func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { + istate := interrupt.Disable() + + // See whether we can receive from one of the channels. + for i, state := range states { + if state.value == nil { + // A receive operation. + if rx, ok := state.ch.tryRecv(recvbuf); rx { + chanDebug(state.ch) + interrupt.Restore(istate) + return uintptr(i), ok + } + } else { + // A send operation: state.value is not nil. + if state.ch.trySend(state.value) { + chanDebug(state.ch) + interrupt.Restore(istate) + return uintptr(i), true + } + } + } + + interrupt.Restore(istate) + return ^uintptr(0), false +} diff --git a/src/runtime/nestedspinlock.go b/src/runtime/nestedspinlock.go new file mode 100644 index 0000000000..83625ff897 --- /dev/null +++ b/src/runtime/nestedspinlock.go @@ -0,0 +1,35 @@ +package runtime + +import ( + "sync/atomic" +) + +// NestedSpinLock is a NestedSpinLock implementation. +// +// A NestedSpinLock must not be copied after first use. +type NestedSpinLock struct { + owner uintptr + nestedCount int +} + +// Lock locks l. +// If the lock is already in use, the calling goroutine +// blocks until the locker is available. +func (l *NestedSpinLock) Lock() { + tid := uintptr(currentMachineId()) + for !atomic.CompareAndSwapUintptr(&l.owner, 0, tid) { + if atomic.LoadUintptr(&l.owner) == tid { + break + } + // waiting for unlock + } + l.nestedCount++ +} + +// Unlock unlocks l. +func (l *NestedSpinLock) Unlock() { + l.nestedCount-- + if l.nestedCount == 0 { + atomic.StoreUintptr(&l.owner, 0) + } +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 8af3d673c4..94c6e665ce 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -253,3 +253,10 @@ func growHeap() bool { setHeapEnd(heapStart + heapSize) return true } + +//export gettid +func gettid() int + +func currentMachineId() int { + return gettid() +} diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..4111c866d7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,11 @@ +//go:build scheduler.threads + +package runtime + +func getSystemStackPointer() uintptr { + return getCurrentStackPointer() +} + +func waitForEvents() { + sleepTicks(nanosecondsToTicks(1000000)) +} diff --git a/src/runtime/spinlock/spinlock.go b/src/runtime/spinlock/spinlock.go new file mode 100644 index 0000000000..ba1a167ff2 --- /dev/null +++ b/src/runtime/spinlock/spinlock.go @@ -0,0 +1,26 @@ +package spinlock + +import ( + "sync/atomic" +) + +// SpinLock is a spinlock implementation. +// +// A SpinLock must not be copied after first use. +type SpinLock struct { + lock uintptr +} + +// Lock locks l. +// If the lock is already in use, the calling goroutine +// blocks until the locker is available. +func (l *SpinLock) Lock() { + for !atomic.CompareAndSwapUintptr(&l.lock, 0, 1) { + // waiting for unlock + } +} + +// Unlock unlocks l. +func (l *SpinLock) Unlock() { + atomic.StoreUintptr(&l.lock, 0) +} diff --git a/src/runtime/spinlock/spinlock_test.go b/src/runtime/spinlock/spinlock_test.go new file mode 100644 index 0000000000..d0fbda2385 --- /dev/null +++ b/src/runtime/spinlock/spinlock_test.go @@ -0,0 +1,47 @@ +package spinlock + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func testLock(threads, n int, l sync.Locker) time.Duration { + var wg sync.WaitGroup + wg.Add(threads) + + var count1 int + var count2 int + + start := time.Now() + for i := 0; i < threads; i++ { + go func() { + for i := 0; i < n; i++ { + l.Lock() + count1++ + count2 += 2 + l.Unlock() + } + wg.Done() + }() + } + wg.Wait() + dur := time.Since(start) + if count1 != threads*n { + panic("mismatch") + } + if count2 != threads*n*2 { + panic("mismatch") + } + return dur +} + +func TestSpinLock(t *testing.T) { + fmt.Printf("[1] spinlock %4.0fms\n", testLock(1, 1000000, &SpinLock{}).Seconds()*1000) + fmt.Printf("[1] mutex %4.0fms\n", testLock(1, 1000000, &sync.Mutex{}).Seconds()*1000) + fmt.Printf("[4] spinlock %4.0fms\n", testLock(4, 1000000, &SpinLock{}).Seconds()*1000) + fmt.Printf("[4] mutex %4.0fms\n", testLock(4, 1000000, &sync.Mutex{}).Seconds()*1000) + fmt.Printf("[8] spinlock %4.0fms\n", testLock(8, 1000000, &SpinLock{}).Seconds()*1000) + fmt.Printf("[8] mutex %4.0fms\n", testLock(8, 1000000, &sync.Mutex{}).Seconds()*1000) +} diff --git a/src/runtime/wait_other.go b/src/runtime/wait_other.go index b51d4b64b6..759f2b5334 100644 --- a/src/runtime/wait_other.go +++ b/src/runtime/wait_other.go @@ -1,4 +1,4 @@ -//go:build !tinygo.riscv && !cortexm +//go:build !tinygo.riscv && !cortexm && !scheduler.threads package runtime From 04fbf6535846a26f933d6bf34a399730a054f5eb Mon Sep 17 00:00:00 2001 From: Joseph Lee Date: Tue, 19 Dec 2023 18:43:57 +0900 Subject: [PATCH 3/3] fix: build tag --- src/runtime/nestedspinlock.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/runtime/nestedspinlock.go b/src/runtime/nestedspinlock.go index 83625ff897..209b7f60aa 100644 --- a/src/runtime/nestedspinlock.go +++ b/src/runtime/nestedspinlock.go @@ -1,3 +1,6 @@ +//go:build multicore +// +build multicore + package runtime import (