Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: threads scheduler #4047

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions builder/musl.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ var Musl = Library{
"thread/*.c",
"time/*.c",
"unistd/*.c",
"linux/gettid.c",
}
if arch == "arm" {
// These files need to be added to the start for some reason.
Expand Down
3 changes: 3 additions & 0 deletions compileopts/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (c *Config) GOARM() string {
// BuildTags returns the complete list of build tags used during this build.
func (c *Config) BuildTags() []string {
tags := append(c.Target.BuildTags, []string{"tinygo", "math_big_pure_go", "gc." + c.GC(), "scheduler." + c.Scheduler(), "serial." + c.Serial()}...)
if c.Scheduler() == "threads" {
tags = append(tags, "multicore")
}
for i := 1; i <= c.GoMinorVersion; i++ {
tags = append(tags, fmt.Sprintf("go1.%d", i))
}
Expand Down
2 changes: 1 addition & 1 deletion compileopts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var (
validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"}
validSchedulerOptions = []string{"none", "tasks", "asyncify"}
validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"}
validSerialOptions = []string{"none", "uart", "usb"}
validPrintSizeOptions = []string{"none", "short", "full"}
validPanicStrategyOptions = []string{"print", "trap"}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func main() {
opt := flag.String("opt", "z", "optimization level: 0, 1, 2, s, z")
gc := flag.String("gc", "", "garbage collector to use (none, leaking, conservative)")
panicStrategy := flag.String("panic", "print", "panic strategy (print, trap)")
scheduler := flag.String("scheduler", "", "which scheduler to use (none, tasks, asyncify)")
scheduler := flag.String("scheduler", "", "which scheduler to use (none, tasks, asyncify, threads)")
serial := flag.String("serial", "", "which serial output to use (none, uart, usb)")
work := flag.Bool("work", false, "print the name of the temporary build directory and do not delete this directory on exit")
interpTimeout := flag.Duration("interp-timeout", 180*time.Second, "interp optimization pass timeout")
Expand Down
139 changes: 63 additions & 76 deletions src/internal/task/queue.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,63 @@
package task

import "runtime/interrupt"
import (
"sync/atomic"
"unsafe"
)

const asserts = false

// Queue is a FIFO container of tasks.
// The zero value is an empty queue.
type Queue struct {
head, tail *Task
// in is a stack used to buffer incoming tasks.
in Stack

// out is a singly linked list of tasks in oldest-first order.
// Once empty, it is refilled by dumping and flipping the input stack.
out *Task
}

// Push a task onto the queue.
// This is atomic.
func (q *Queue) Push(t *Task) {
i := interrupt.Disable()
if asserts && t.Next != nil {
interrupt.Restore(i)
panic("runtime: pushing a task to a queue with a non-nil Next pointer")
}
if q.tail != nil {
q.tail.Next = t
}
q.tail = t
t.Next = nil
if q.head == nil {
q.head = t
}
interrupt.Restore(i)
q.in.Push(t)
}

// Pop a task off of the queue.
// This cannot be called concurrently.
func (q *Queue) Pop() *Task {
i := interrupt.Disable()
t := q.head
if t == nil {
interrupt.Restore(i)
return nil
}
q.head = t.Next
if q.tail == t {
q.tail = nil
}
t.Next = nil
interrupt.Restore(i)
return t
}
next := q.out
if next == nil {
// Dump the input stack.
s := q.in.dump()

// Flip it.
var prev *Task
for t := s.top; t != nil; {
next := t.Next
t.Next = prev
prev = t
t = next
}
if prev == nil {
// The queue is empty.
return nil
}

// Append pops the contents of another queue and pushes them onto the end of this queue.
func (q *Queue) Append(other *Queue) {
i := interrupt.Disable()
if q.head == nil {
q.head = other.head
} else {
q.tail.Next = other.head
// Save it in the output list.
next = prev
}
q.tail = other.tail
other.head, other.tail = nil, nil
interrupt.Restore(i)

q.out = next.Next
next.Next = nil
return next
}

// Empty checks if the queue is empty.
// This cannot be called concurrently with Pop.
func (q *Queue) Empty() bool {
i := interrupt.Disable()
empty := q.head == nil
interrupt.Restore(i)
return empty
return q.out == nil && atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.in.top))) == nil
}

// Stack is a LIFO container of tasks.
Expand All @@ -74,51 +68,44 @@ type Stack struct {
}

// Push a task onto the stack.
// This is atomic.
func (s *Stack) Push(t *Task) {
i := interrupt.Disable()
if asserts && t.Next != nil {
interrupt.Restore(i)
panic("runtime: pushing a task to a stack with a non-nil Next pointer")
}
s.top, t.Next = t, s.top
interrupt.Restore(i)
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
doPush:
top := atomic.LoadPointer(topPtr)
t.Next = (*Task)(top)
if !atomic.CompareAndSwapPointer(topPtr, top, unsafe.Pointer(t)) {
goto doPush
}
}

// Pop a task off of the stack.
// This is atomic.
func (s *Stack) Pop() *Task {
i := interrupt.Disable()
t := s.top
if t != nil {
s.top = t.Next
t.Next = nil
}
interrupt.Restore(i)
return t
}

// tail follows the chain of tasks.
// If t is nil, returns nil.
// Otherwise, returns the task in the chain where the Next field is nil.
func (t *Task) tail() *Task {
if t == nil {
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
doPop:
top := atomic.LoadPointer(topPtr)
if top == nil {
return nil
}
for t.Next != nil {
t = t.Next
t := (*Task)(top)
next := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.Next)))
if !atomic.CompareAndSwapPointer(topPtr, top, next) {
goto doPop
}
t.Next = nil
return t
}

// Queue moves the contents of the stack into a queue.
// Elements can be popped from the queue in the same order that they would be popped from the stack.
func (s *Stack) Queue() Queue {
i := interrupt.Disable()
head := s.top
s.top = nil
q := Queue{
head: head,
tail: head.tail(),
// dump the contents of the stack to another stack.
func (s *Stack) dump() Stack {
return Stack{
top: (*Task)(atomic.SwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
nil,
)),
}
interrupt.Restore(i)
return q
}
7 changes: 7 additions & 0 deletions src/internal/task/task_handover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package task

//export tinygo_handoverTask
func handoverTask(fn uintptr, args uintptr)

//go:extern tinygo_callMachineEntry
var callMachineEntryFn [0]uint8
26 changes: 26 additions & 0 deletions src/internal/task/task_stack_amd64.S
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,32 @@ tinygo_swapTask:
// Return into the new task, as if tinygo_swapTask was a regular call.
ret

#ifdef __MACH__ // Darwin
.global _tinygo_handoverTask
_tinygo_handoverTask:
#else // Linux etc
.section .text.tinygo_handoverTask
.global tinygo_handoverTask
tinygo_handoverTask:
#endif
.cfi_startproc
movq %rdi, %rax
movq %rsi, %rdi
jmp *%rax
.cfi_endproc

#ifdef __MACH__ // Darwin
.global _tinygo_callMachineEntry
_tinygo_callMachineEntry:
#else // Linux etc
.section .text.tinygo_callMachineEntry
.global tinygo_callMachineEntry
tinygo_callMachineEntry:
#endif
.cfi_startproc
jmp tinygo_machineEntry
.cfi_endproc

#ifdef __MACH__ // Darwin
// allow these symbols to stripped as dead code
.subsections_via_symbols
Expand Down
101 changes: 101 additions & 0 deletions src/internal/task/task_threads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//go:build scheduler.threads

package task

import (
"runtime/spinlock"
"sync/atomic"
"unsafe"
)

//go:linkname runtimePanic runtime.runtimePanic
func runtimePanic(str string)

//go:linkname currentMachineId runtime.currentMachineId
func currentMachineId() int

type state struct {
tid int

paused int32
}

var lock spinlock.SpinLock
var tasks map[int]*Task = map[int]*Task{}

// Current returns the current active task.
func Current() *Task {
tid := currentMachineId()
lock.Lock()
task := tasks[tid]
lock.Unlock()
return task
}

// Pause suspends the current task and returns to the scheduler.
// This function may only be called when running on a goroutine stack, not when running on the system stack or in an interrupt.
func Pause() {
task := Current()
atomic.StoreInt32(&task.state.paused, 1)
for atomic.LoadInt32(&task.state.paused) == 1 {
// wait for notify
}
}

// Resume the task until it pauses or completes.
// This may only be called from the scheduler.
func (t *Task) Resume() {
atomic.StoreInt32(&t.state.paused, 0)
}

// OnSystemStack returns whether the caller is running on the system stack.
func OnSystemStack() bool {
// If there is not an active goroutine, then this must be running on the system stack.
return Current() == nil
}

func SystemStack() uintptr {
return 0
}

type taskHandoverParam struct {
t *Task
fn uintptr
args uintptr
}

var retainObjects = map[unsafe.Pointer]any{}

// start creates and starts a new goroutine with the given function and arguments.
// The new goroutine is scheduled to run later.
func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) {
t := &Task{}
h := &taskHandoverParam{
t: t,
fn: fn,
args: uintptr(args),
}

lock.Lock()
retainObjects[unsafe.Pointer(h)] = h
lock.Unlock()

createThread(uintptr(unsafe.Pointer(&callMachineEntryFn)), uintptr(unsafe.Pointer(h)), stackSize)
}

//export tinygo_machineEntry
func machineEntry(param *taskHandoverParam) {
tid := currentMachineId()
param.t.state.tid = tid

lock.Lock()
delete(retainObjects, unsafe.Pointer(param))
tasks[tid] = param.t
lock.Unlock()

handoverTask(param.fn, param.args)

lock.Lock()
delete(tasks, tid)
lock.Unlock()
}
26 changes: 26 additions & 0 deletions src/internal/task/task_threads_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package task

import (
"strconv"
"unsafe"
)

//go:extern errno
var libcErrno uintptr

//export pthread_create
func pthread_create(pthread uintptr, arg2 uintptr, fn uintptr, param uintptr) int

type Errno uintptr

func (e Errno) Error() string {
return "system error: " + strconv.Itoa(int(e))
}

func createThread(fn uintptr, param uintptr, stackSize uintptr) (uintptr, error) {
var pthread uintptr
if pthread_create(uintptr(unsafe.Pointer(&pthread)), 0, fn, param) < 0 {
return pthread, Errno(libcErrno)
}
return pthread, nil
}
3 changes: 3 additions & 0 deletions src/runtime/chan.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build !multicore
// +build !multicore

package runtime

// This file implements the 'chan' type and send/receive/select operations.
Expand Down
Loading