diff --git a/bgrun/.github/FUNDING.yml b/bgrun/.github/FUNDING.yml new file mode 100644 index 000000000..bb69c2ad6 --- /dev/null +++ b/bgrun/.github/FUNDING.yml @@ -0,0 +1 @@ +github: arp242 diff --git a/bgrun/LICENSE b/bgrun/LICENSE new file mode 100644 index 000000000..699f18ba6 --- /dev/null +++ b/bgrun/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright © Martin Tournoij + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +The software is provided "as is", without warranty of any kind, express or +implied, including but not limited to the warranties of merchantability, +fitness for a particular purpose and noninfringement. In no event shall the +authors or copyright holders be liable for any claim, damages or other +liability, whether in an action of contract, tort or otherwise, arising +from, out of or in connection with the software or the use or other dealings +in the software. diff --git a/bgrun/README.md b/bgrun/README.md new file mode 100644 index 000000000..ed592be28 --- /dev/null +++ b/bgrun/README.md @@ -0,0 +1,2 @@ +Need to put this in its own repo; just need to finish some things and write +docs. diff --git a/bgrun/bgrun.go b/bgrun/bgrun.go index 71227d15c..e4c7f4d20 100644 --- a/bgrun/bgrun.go +++ b/bgrun/bgrun.go @@ -1,240 +1,332 @@ -// Copyright © Martin Tournoij – This file is part of GoatCounter and published -// under the terms of a slightly modified EUPL v1.2 license, which can be found -// in the LICENSE file or at https://license.goatcounter.com - // Package bgrun runs jobs in the background. -// -// This is mostly intended for "fire and forget" type of goroutines like sending -// an email. They typically don't really need any synchronisation as such but -// you do want to wait for them to finish before the program exits, or you want -// to wait for them in tests. package bgrun import ( "context" + "errors" "fmt" - "os" + "runtime" "sort" - "strings" "sync" "time" +) - "zgo.at/errors" - "zgo.at/zli" - "zgo.at/zlog" - "zgo.at/zstd/zdebug" - "zgo.at/zstd/zsync" +type ( + task struct { + name string + maxPar int + fun func(context.Context) error + } + job struct { + task task + wg sync.WaitGroup + num int + instances []*jobInstance + } + jobInstance struct { + from string + started time.Time + } + Job struct { + Task string // Task name + Started time.Time // When the job was started. + Took time.Duration // How long the job took to run. + From string // Location where the job was started from. + } + Runner struct { + ctx context.Context + cancel context.CancelFunc + maxHist int + depth int + mu sync.Mutex + tasks map[string]task + jobs map[string]*job + hist []Job + logger func(task string, err error) + } ) -type Job struct { - Name string - From string - NoDuplicates bool - Started time.Time - Finished time.Time +type ErrTooManyJobs struct { + Task string + Num int } -var ( - wg = new(sync.WaitGroup) +func (e ErrTooManyJobs) Error() string { + return fmt.Sprintf("bgrun.Run: task %q has %d jobs already", e.Task, e.Num) +} - working struct { - sync.Mutex - m map[string]Job +func NewRunner(logErr func(task string, err error)) *Runner { + ctx, cancel := context.WithCancel(context.Background()) + return &Runner{ + ctx: ctx, + cancel: cancel, + maxHist: 100, + depth: 2, + tasks: make(map[string]task), + jobs: make(map[string]*job), + hist: make([]Job, 0, 100), + logger: logErr, } +} - hist struct { - sync.Mutex - l []Job +// NewTask registers a new task. +func (r *Runner) NewTask(name string, maxPar int, f func(context.Context) error) { + if maxPar < 1 { + maxPar = 1 + } + if name == "" { + panic("bgrun.New: name cannot be an empty string") + } + if f == nil { + panic("bgrun.New: function cannot be nil") } -) - -const maxHist = 1_000 -// Wait for all goroutines to finish for a maximum of maxWait. -func Wait(ctx context.Context) error { - // TODO: this won't actually kill the goroutines that are still running. - return errors.Wrap(zsync.Wait(ctx, wg), "bgrun.Wait") + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.tasks[name]; ok { + panic(fmt.Sprintf("bgrun.New: task %q already exists", name)) + } + r.tasks[name] = task{ + name: name, + maxPar: maxPar, + fun: f, + } } -// WaitProgress calls Wait() and prints which tasks it's waiting for. -func WaitProgress(ctx context.Context) error { - term := zli.IsTerminal(os.Stdout.Fd()) - - go func() { - func() { - working.Lock() - defer working.Unlock() - if len(working.m) == 0 { - return - } - }() +func (r *Runner) Reset() { + r.mu.Lock() + defer r.mu.Unlock() - for { - if term { - zli.EraseLine() - } + r.maxHist = 100 + r.tasks = make(map[string]task) + r.jobs = make(map[string]*job) + r.hist = make([]Job, 0, r.maxHist) +} - func() { - working.Lock() - defer working.Unlock() - if len(working.m) == 0 { - if term { - fmt.Println() - } - return - } +// Run a new job. +func (r *Runner) Run(name string, fun func(context.Context) error) error { + return r.run(name, fun) +} - if term { - fmt.Printf("%d tasks: ", len(working.m)) - } - l := make([]string, 0, len(working.m)) - for k := range working.m { - l = append(l, k) - } - sort.Strings(l) - if term { - fmt.Print(strings.Join(l, ", "), " ") - } - }() +// MustRun behaves like [Run], but will panic on errors. +func (r *Runner) MustRun(name string, fun func(context.Context) error) { + if err := r.run(name, fun); err != nil { + panic(err) + } +} - time.Sleep(100 * time.Millisecond) - func() { - working.Lock() - defer working.Unlock() - if len(working.m) == 0 { - if term { - fmt.Println() - } - return - } - }() - } - }() +// RunFunction is like [Run], but the function doesn't support cancellation +// through context or error logging. +func (r *Runner) RunFunction(name string, fun func()) error { + return r.Run(name, func(context.Context) error { fun(); return nil }) +} - err := Wait(ctx) - if term { - zli.EraseLine() - fmt.Print(" done \n") +// MustRunFunction is like [RunFunction], but will panic on errors. +func (r *Runner) MustRunFunction(name string, fun func()) { + if err := r.RunFunction(name, fun); err != nil { + panic(err) } - return err } -// WaitAndLog calls Wait() and logs any errors. -func WaitAndLog(ctx context.Context) { - err := Wait(ctx) - if err != nil { - zlog.Error(err) +// MustRunTask behaves like [RunTask], but will panic on errors. +func (r *Runner) MustRunTask(name string) { + if err := r.run(name, nil); err != nil { + panic(err) } } -// WaitProgressAndLog calls Wait(), prints which tasks it's waiting for, and -// logs any errors. -func WaitProgressAndLog(ctx context.Context) { - err := WaitProgress(ctx) - if err != nil { - zlog.Error(err) - } +// RunTask runs a registered task. +func (r *Runner) RunTask(name string) error { + return r.run(name, nil) } -// Run the function in a goroutine. -// -// bgrun.Run(func() { -// // Do work... -// }) -func Run(name string, f func()) { - done := add(name, false) +// Always call this function from both RunTask() and MustRunTask() so the stack +// trace is always identical. +func (r *Runner) run(name string, fun func(context.Context) error) error { + isTask := fun == nil + r.mu.Lock() + defer r.mu.Unlock() + + t, ok := r.tasks[name] + if !ok && isTask { + return fmt.Errorf("bgrun.Run: no task %q", name) + } + + j, ok := r.jobs[name] + if isTask { + if ok && j.num >= t.maxPar { + return &ErrTooManyJobs{Task: name, Num: j.num} + } + fun = t.fun + } else { + t = task{name: name} + } + if !ok { + j = &job{task: t} + r.jobs[name] = j + } + + i := len(j.instances) + inst := jobInstance{ + started: time.Now(), + from: loc(r.depth), + } + j.instances = append(j.instances, &inst) + + j.wg.Add(1) go func() { - defer zlog.Recover() - defer done() - f() + defer func() { + rec := recover() + + r.mu.Lock() + r.hist = append(r.hist, Job{ + Task: j.task.name, + From: inst.from, + Started: inst.started, + Took: time.Now().Sub(inst.started), + }) + if len(r.hist) > r.maxHist { + r.hist = r.hist[len(r.hist)-r.maxHist:] + } + + j.instances[i] = nil + j.num-- + r.mu.Unlock() + j.wg.Done() + + if rec != nil { + switch rr := rec.(type) { + case error: + r.logger(name, rr) + case string: + r.logger(name, errors.New(rr)) + default: + r.logger(name, fmt.Errorf("%s", rr)) + } + } + }() + + err := fun(r.ctx) + if err != nil { + r.logger(name, err) + } }() + return nil } -// RunNoDuplicates is like Run(), but only allows one instance of this name. +// Wait for all running jobs for the task to finish. // -// It will do nothing if there's already something running with this name. -func RunNoDuplicates(name string, f func()) { - if Running(name) { +// If name is an empty string it will wait for jobs for all tasks. +func (r *Runner) Wait(name string) { + if name == "" { + r.mu.Lock() + var wg sync.WaitGroup + wg.Add(len(r.jobs)) + for _, j := range r.jobs { + j := j + go func() { + defer wg.Done() + j.wg.Wait() + }() + } + r.mu.Unlock() + wg.Wait() return } - done := add(name, true) + r.mu.Lock() + j, ok := r.jobs[name] + r.mu.Unlock() + if ok { + j.wg.Wait() + } +} + +func (r *Runner) WaitFor(d time.Duration, name string) error { + var ( + t = time.NewTimer(d) + done = make(chan struct{}) + ) go func() { - defer zlog.Recover() - defer done() - f() + r.Wait(name) + t.Stop() + close(done) }() + + select { + case <-t.C: + r.cancel() + return fmt.Errorf("bgrun.WaitFor: %w", context.DeadlineExceeded) + case <-done: + return nil + } } -// Add a new function to the waitgroup and return the done. +// History gets the history. Only jobs that are finished running are added to +// the history. // -// done := bgrun.Add() -// go func() { -// defer done() -// defer zlog.Recover() -// }() -func add(name string, nodup bool) func() { - wg.Add(1) - func() { - working.Lock() - defer working.Unlock() - if working.m == nil { - working.m = make(map[string]Job) - } - working.m[name] = Job{ - Name: name, - Started: time.Now(), - From: zdebug.Loc(3), - NoDuplicates: nodup, - } - }() +// If newSize is >0 then it also sets the new history size (the default is 100). +// if newSize <0 history will be disabled. +func (r *Runner) History(newSize int) []Job { + r.mu.Lock() + defer r.mu.Unlock() - return func() { - wg.Done() - func() { - working.Lock() - defer working.Unlock() - hist.Lock() - defer hist.Unlock() - - hist.l = append(hist.l, working.m[name]) - hist.l[len(hist.l)-1].Finished = time.Now() - if len(hist.l) > maxHist { - hist.l = hist.l[len(hist.l)-maxHist:] - } + cpy := make([]Job, len(r.hist)) + copy(cpy, r.hist) - delete(working.m, name) - }() + if newSize > 0 { + r.maxHist = newSize + if newSize < len(r.hist) { + r.hist = make([]Job, newSize) + copy(r.hist, cpy) + } + } + if newSize < 0 { + r.maxHist = 0 + r.hist = nil } -} -// Running reports if a function by this name is already running. -func Running(name string) bool { - working.Lock() - defer working.Unlock() - _, ok := working.m[name] - return ok + return cpy } -// List returns all running functions. -func List() []Job { - working.Lock() - defer working.Unlock() +// Running returns all running jobs. +func (r *Runner) Running() []Job { + r.mu.Lock() + defer r.mu.Unlock() - l := make([]Job, 0, len(working.m)) - for _, j := range working.m { - l = append(l, j) + l := make([]Job, 0, len(r.jobs)) + for _, j := range r.jobs { + for _, inst := range j.instances { + if inst != nil { + l = append(l, Job{ + Task: j.task.name, + Started: inst.started, + From: inst.from, + }) + } + } } - sort.Slice(l, func(i, j int) bool { return l[i].Name < l[j].Name }) + sort.Slice(l, func(i, j int) bool { return l[i].Started.Before(l[j].Started) }) return l } -// History gets the last 1,000 jobs that ran. -func History() []Job { - hist.Lock() - defer hist.Unlock() +// loc gets a location in the stack trace. Use 0 for the current location; 1 for +// one up, etc. +func loc(n int) string { + _, file, line, ok := runtime.Caller(n + 1) + if !ok { + file = "???" + line = 0 + } - cpy := make([]Job, len(hist.l)) - copy(cpy, hist.l) - return cpy + short := file + for i := len(file) - 1; i > 0; i-- { + if file[i] == '/' { + short = file[i+1:] + break + } + } + file = short + + return fmt.Sprintf("%v:%v", file, line) } diff --git a/bgrun/bgrun_test.go b/bgrun/bgrun_test.go index f04583657..b9c259e01 100644 --- a/bgrun/bgrun_test.go +++ b/bgrun/bgrun_test.go @@ -1,44 +1,220 @@ -// Copyright © Martin Tournoij – This file is part of GoatCounter and published -// under the terms of a slightly modified EUPL v1.2 license, which can be found -// in the LICENSE file or at https://license.goatcounter.com - package bgrun import ( + "bytes" "context" + "errors" + "fmt" + "os" + "reflect" + "sort" + "strings" "testing" "time" - "zgo.at/errors" + "sync/atomic" +) + +var ( + i atomic.Int32 + testFunc = func(context.Context) error { time.Sleep(50 * time.Millisecond); i.Add(1); return nil } ) +func init() { + NewTask("test", 1, testFunc) +} + +func reset() { + Reset() + i = atomic.Int32{} + NewTask("test", 1, testFunc) +} + func TestRun(t *testing.T) { - i := 0 - Run("test 1", func() { - time.Sleep(200 * time.Millisecond) - i = 1 + defer reset() + + MustRunTask("test") + Wait("test") + if i.Load() != 1 { + t.Fatalf("i is %d, not 1", i.Load()) + } + + MustRunTask("test") + Wait("test") + if i.Load() != 2 { + t.Fatalf("i is %d, not 2", i.Load()) + } +} + +func TestMultiple(t *testing.T) { + defer reset() + + MustRunTask("test") + MustRunTask("test") + Wait("test") + if i.Load() != 2 { + t.Fatalf("i is %d, not 2", i.Load()) + } +} + +func TestWaitAll(t *testing.T) { + defer reset() + + NewTask("test2", 2, testFunc) + + MustRunTask("test") + MustRunTask("test") + MustRunTask("test2") + MustRunTask("test2") + Wait("") + if i.Load() != 4 { + t.Fatalf("i is %d, not 4", i.Load()) + } +} + +func TestWaitFor(t *testing.T) { + defer reset() + + NewTask("test2", 1, func(ctx context.Context) error { + select { + case <-ctx.Done(): + case <-time.After(50 * time.Millisecond): + t.Error("not cancelled") + } + return nil }) - err := Wait(context.Background()) + MustRunTask("test2") + MustRunTask("test") + + err := WaitFor(2*time.Millisecond, "") + if !errors.Is(err, context.DeadlineExceeded) { + t.Error(err) + } + + err = WaitFor(50*time.Millisecond, "") if err != nil { - t.Fatal(err) + t.Error(err) + } +} + +func jobList(h []Job) []string { + s := make([]string, 0, len(h)) + for _, hh := range h { + s = append(s, fmt.Sprintf("%s %s %s %s", hh.Task, + hh.Started.Format("2006-01-02"), + hh.Took.Truncate(time.Millisecond*10), + hh.From[:strings.Index(hh.From, ":")])) + } + sort.Strings(s) + return s +} + +func TestHistory(t *testing.T) { + defer reset() + + NewTask("test2", 2, testFunc) + NewTask("test3", 2, testFunc) + NewTask("test4", 2, testFunc) + + MustRunTask("test") + MustRunTask("test") + MustRunTask("test2") + MustRunTask("test2") + MustRunTask("test3") + MustRunTask("test3") + Wait("") + + want := []string{ + "test 2022-11-13 50ms bgrun_test.go", + "test 2022-11-13 50ms bgrun_test.go", + "test2 2022-11-13 50ms bgrun_test.go", + "test2 2022-11-13 50ms bgrun_test.go", + "test3 2022-11-13 50ms bgrun_test.go", + "test3 2022-11-13 50ms bgrun_test.go", + } + if h := jobList(History(0)); !reflect.DeepEqual(h, want) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) } - if i != 1 { - t.Fatal("i not set") + + if h := jobList(History(2)); !reflect.DeepEqual(h, want) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) + } + if l := len(History(0)); l != 2 { + t.Error(l) + } + + if l := len(History(-1)); l != 2 { + t.Error(l) + } + if l := len(History(0)); l != 0 { + t.Error(l) + } + + History(2) + MustRunTask("test") + if l := len(History(0)); l != 0 { + t.Error(l) + } + Wait("") + want = []string{ + "test 2022-11-13 50ms bgrun_test.go", + } + if h := jobList(History(0)); !reflect.DeepEqual(h, want) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) + } + + MustRunTask("test") + MustRunTask("test") + MustRunTask("test") + Wait("") + want = []string{ + "test 2022-11-13 50ms bgrun_test.go", + "test 2022-11-13 50ms bgrun_test.go", + } + if h := jobList(History(0)); !reflect.DeepEqual(h, want) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) } } -func TestWait(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() +func TestRunning(t *testing.T) { + defer reset() + + NewTask("test2", 2, testFunc) + MustRunTask("test") + MustRunTask("test2") + RunTask("test") - Run("test wait", func() { time.Sleep(2 * time.Second) }) - err := Wait(ctx) - if err == nil { - t.Fatal("error is nil") + want := []string{ + "test 2022-11-13 0s bgrun_test.go", + "test 2022-11-13 0s bgrun_test.go", + "test2 2022-11-13 0s bgrun_test.go", } - if !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("wrong error; %#v", err) + if h := jobList(Running()); !reflect.DeepEqual(h, want) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) } + Wait("") - time.Sleep(2 * time.Second) + if h := jobList(Running()); !reflect.DeepEqual(h, []string{}) { + t.Errorf("\nhave: %#v\nwant: %#v", h, want) + } +} + +func TestLog(t *testing.T) { + defer reset() + + buf := new(bytes.Buffer) + stderr = buf + defer func() { stderr = os.Stderr }() + NewTask("error", 1, func(context.Context) error { return errors.New("oh noes") }) + NewTask("panic", 1, func(context.Context) error { panic("FIRE!") }) + + MustRunTask("error") + Wait("") + MustRunTask("panic") + Wait("") + + want := "bgrun: error running task \"error\": oh noes\nbgrun: error running task \"panic\": FIRE!\n" + if buf.String() != want { + t.Errorf("\nwant: %q\nhave: %q", want, buf.String()) + } } diff --git a/bgrun/default.go b/bgrun/default.go new file mode 100644 index 000000000..7a8eb4a6c --- /dev/null +++ b/bgrun/default.go @@ -0,0 +1,37 @@ +package bgrun + +import ( + "context" + "fmt" + "io" + "os" + "time" +) + +// TODO: probably want to get rid of this; just easier for now to migrate things +// from the old bgrun to this. + +var stderr io.Writer = os.Stderr + +var defaultRunner = func() *Runner { + r := NewRunner(func(t string, err error) { + fmt.Fprintf(stderr, "bgrun: error running task %q: %s\n", t, err.Error()) + }) + r.depth = 3 + return r +}() + +func NewTask(name string, maxPar int, f func(context.Context) error) { + defaultRunner.NewTask(name, maxPar, f) +} +func Reset() { defaultRunner.Reset() } +func Run(name string, fun func(context.Context) error) error { return defaultRunner.Run(name, fun) } +func MustRun(name string, fun func(context.Context) error) { defaultRunner.MustRun(name, fun) } +func RunFunction(name string, fun func()) error { return defaultRunner.RunFunction(name, fun) } +func MustRunFunction(name string, fun func()) { defaultRunner.MustRunFunction(name, fun) } +func MustRunTask(name string) { defaultRunner.MustRunTask(name) } +func RunTask(name string) error { return defaultRunner.RunTask(name) } +func Wait(name string) { defaultRunner.Wait(name) } +func WaitFor(d time.Duration, name string) error { return defaultRunner.WaitFor(d, name) } +func History(newSize int) []Job { return defaultRunner.History(newSize) } +func Running() []Job { return defaultRunner.Running() } diff --git a/bgrun/go.mod b/bgrun/go.mod new file mode 100644 index 000000000..a0f182132 --- /dev/null +++ b/bgrun/go.mod @@ -0,0 +1,3 @@ +module zgo.at/bgrun + +go 1.19 diff --git a/bgrun/go.sum b/bgrun/go.sum new file mode 100644 index 000000000..e69de29bb diff --git a/cmd/goatcounter/buffer_test.go b/cmd/goatcounter/buffer_test.go index c80a9b8c1..96827340b 100644 --- a/cmd/goatcounter/buffer_test.go +++ b/cmd/goatcounter/buffer_test.go @@ -54,6 +54,7 @@ func TestBuffer(t *testing.T) { i := zsync.NewAtomicInt(0) handle := handlers.NewBackend(zdb.MustGetDB(ctx), nil, false, false, false, "", 15) goatcounter.Memstore.TestInit(zdb.MustGetDB(ctx)) + cron.Start(ctx) s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ii := i.Add(1) @@ -72,6 +73,11 @@ func TestBuffer(t *testing.T) { *r = *r.WithContext(ctx) handle.ServeHTTP(w, r) + err := cron.TaskPersistAndStat() + if err != nil { + t.Error(err) + } + cron.WaitPersistAndStat() })) backend = s.URL } @@ -121,7 +127,6 @@ func TestBuffer(t *testing.T) { } time.Sleep(bufSendTime * 2) - cron.PersistAndStat(ctx) { var got int diff --git a/cmd/goatcounter/db_test.go b/cmd/goatcounter/db_test.go index 4e60c1124..28e8953bc 100644 --- a/cmd/goatcounter/db_test.go +++ b/cmd/goatcounter/db_test.go @@ -328,11 +328,11 @@ func TestDBUser(t *testing.T) { "-db="+dbc, "-find=1", "-find=new@new.new") wantExit(t, exit, out, 0) - if !grep(out.String(), `user_id +1`) { - t.Error(out.String()) + if r := `user_id +1`; !grep(out.String(), r) { + t.Errorf("user 1 not found in output (via regexp %q):\n%s", r, out.String()) } - if !grep(out.String(), `user_id +2`) { - t.Error(out.String()) + if r := `user_id +2`; !grep(out.String(), r) { + t.Errorf("user 2 not found in output (via regexp %q):\n%s", r, out.String()) } out.Reset() } diff --git a/cmd/goatcounter/import_test.go b/cmd/goatcounter/import_test.go index e9c7d319b..24a3ef535 100644 --- a/cmd/goatcounter/import_test.go +++ b/cmd/goatcounter/import_test.go @@ -63,7 +63,7 @@ func runImport(ctx context.Context, t *testing.T, exit *zli.TestExit, args ...st "-site=http://test.localhost:9876", "-debug=all"}, args...)...) - err := cron.PersistAndStat(ctx) + err := cron.TaskPersistAndStat() if err != nil { t.Fatal(err) } @@ -188,7 +188,7 @@ func TestImport(t *testing.T) { 4)...) stop <- struct{}{} time.Sleep(1000 * time.Millisecond) - err := cron.PersistAndStat(ctx) + err := cron.TaskPersistAndStat() if err != nil { t.Fatal(err) } @@ -220,7 +220,7 @@ func TestImport(t *testing.T) { 100)...) stop <- struct{}{} time.Sleep(1000 * time.Millisecond) - err := cron.PersistAndStat(ctx) + err := cron.TaskPersistAndStat() if err != nil { t.Fatal(err) } diff --git a/cmd/goatcounter/main_test.go b/cmd/goatcounter/main_test.go index ebca1491f..b032e879b 100644 --- a/cmd/goatcounter/main_test.go +++ b/cmd/goatcounter/main_test.go @@ -45,12 +45,9 @@ func startTest(t *testing.T) ( // TODO: should really have helper function in zlog. mu.Lock() + logout := zli.Stdout zlog.Config.SetOutputs(func(l zlog.Log) { - out := zli.Stdout - if l.Level == zlog.LevelErr { - out = zli.Stderr - } - fmt.Fprintln(out, zlog.Config.Format(l)) + fmt.Fprintln(logout, zlog.Config.Format(l)) }) mu.Unlock() @@ -64,7 +61,7 @@ func startTest(t *testing.T) ( func runCmdStop(t *testing.T, exit *zli.TestExit, ready chan<- struct{}, stop chan struct{}, cmd string, args ...string) { defer exit.Recover() - defer cron.Stop(context.Background()) + defer cron.Stop() cmdMain(zli.NewFlags(append([]string{"goatcounter", cmd}, args...)), ready, stop) } diff --git a/cmd/goatcounter/saas_test.go b/cmd/goatcounter/saas_test.go index 18ae6b6d7..aa4d3ba25 100644 --- a/cmd/goatcounter/saas_test.go +++ b/cmd/goatcounter/saas_test.go @@ -5,7 +5,6 @@ package main import ( - "bytes" "io" "net/http" "testing" @@ -36,7 +35,7 @@ func TestSaas(t *testing.T) { if resp.StatusCode != 200 { t.Errorf("status %d: %s", resp.StatusCode, b) } - if !bytes.Contains(b, []byte("last_persisted_at")) { + if len(b) < 100 { t.Errorf("%s", b) } diff --git a/cmd/goatcounter/serve.go b/cmd/goatcounter/serve.go index c314182dc..7e5afb825 100644 --- a/cmd/goatcounter/serve.go +++ b/cmd/goatcounter/serve.go @@ -20,11 +20,11 @@ import ( "github.com/go-chi/chi/v5" "github.com/teamwork/reload" "golang.org/x/text/language" + "zgo.at/bgrun" "zgo.at/blackmail" "zgo.at/errors" "zgo.at/goatcounter/v2" "zgo.at/goatcounter/v2/acme" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/goatcounter/v2/cron" "zgo.at/goatcounter/v2/handlers" "zgo.at/z18n" @@ -258,16 +258,33 @@ func doServe(ctx context.Context, db zdb.DB, os.Exit(99) // TODO: zli.Exit? }() - bgrun.Run("shutdown", func() { - err := cron.PersistAndStat(goatcounter.CopyContextValues(ctx)) + bgrun.RunFunction("shutdown", func() { + err := cron.TaskPersistAndStat() if err != nil { zlog.Error(err) } goatcounter.Memstore.StoreSessions(db) }) - zlog.Print("Waiting for background tasks to finish; send HUP, TERM, or INT twice to force kill (may lose data!)") - time.Sleep(10 * time.Millisecond) - bgrun.WaitProgressAndLog(context.Background()) + + time.Sleep(200 * time.Millisecond) // Only show message if it doesn't exit in 200ms. + + first := true + for r := bgrun.Running(); len(r) > 0; r = bgrun.Running() { + if first { + zlog.Print("Waiting for background tasks; send HUP, TERM, or INT twice to force kill") + first = false + } + time.Sleep(100 * time.Millisecond) + + zli.EraseLine() + fmt.Fprintf(zli.Stdout, "%d tasks: ", len(r)) + for i, t := range r { + if i > 0 { + fmt.Fprint(zli.Stdout, ", ") + } + fmt.Fprintf(zli.Stdout, "%s (%s)", t.Task, time.Now().Sub(t.Started).Round(time.Second)) + } + } db.Close() return nil @@ -327,7 +344,7 @@ func flagsServe(f zli.Flags, v *zvalidate.Validator) (string, string, bool, bool blackmail.DefaultMailer = blackmail.NewMailer(*smtp) v.Range("-store-every", int64(*storeEvery), 1, 0) - cron.PersistInterval(time.Duration(*storeEvery) * time.Second) + cron.SetPersistInterval(time.Duration(*storeEvery) * time.Second) goatcounter.InitGeoDB(*geodb) @@ -390,7 +407,7 @@ func setupServe(dbConnect, dbConn string, dev bool, flagTLS string, automigrate return nil, nil, nil, nil, 0, err } - cron.RunBackground(goatcounter.CopyContextValues(ctx)) + cron.Start(goatcounter.CopyContextValues(ctx)) return db, ctx, tlsc, acmeh, listenTLS, nil } @@ -433,7 +450,7 @@ func flagErrors(errors string, v *zvalidate.Validator) { return } - bgrun.Run("email:error", func() { + bgrun.RunFunction("email:error", func() { msg := zlog.Config.Format(l) subject := zstring.GetLine(msg, 1) if i := strings.Index(subject, "ERROR: "); i > -1 { diff --git a/cmd/goatcounter/serve_test.go b/cmd/goatcounter/serve_test.go index ad706859d..a75a253ef 100644 --- a/cmd/goatcounter/serve_test.go +++ b/cmd/goatcounter/serve_test.go @@ -5,7 +5,6 @@ package main import ( - "bytes" "io" "net/http" "testing" @@ -33,7 +32,7 @@ func TestServe(t *testing.T) { if resp.StatusCode != 200 { t.Errorf("status %d: %s", resp.StatusCode, b) } - if !bytes.Contains(b, []byte("last_persisted_at")) { + if len(b) < 100 { t.Errorf("%s", b) } diff --git a/cron/cron.go b/cron/cron.go index 1717bebb4..c41303401 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -8,10 +8,10 @@ package cron import ( "context" "strings" + "sync/atomic" "time" - "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" + "zgo.at/bgrun" "zgo.at/zlog" "zgo.at/zstd/zruntime" "zgo.at/zstd/zsync" @@ -28,95 +28,93 @@ func (t Task) ID() string { } var Tasks = []Task{ - {"vacuum pageviews (data retention)", DataRetention, 1 * time.Hour}, + {"vacuum pageviews (data retention)", dataRetention, 1 * time.Hour}, {"renew ACME certs", renewACME, 2 * time.Hour}, {"vacuum soft-deleted sites", vacuumDeleted, 12 * time.Hour}, {"rm old exports", oldExports, 1 * time.Hour}, {"cycle sessions", sessions, 1 * time.Minute}, - {"send email reports", EmailReports, 1 * time.Hour}, + {"send email reports", emailReports, 1 * time.Hour}, + {"persist hits", persistAndStat, time.Duration(persistInterval.Load())}, } var ( stopped = zsync.NewAtomicInt(0) started = zsync.NewAtomicInt(0) - persistInterval = 10 * time.Second + persistInterval = func() atomic.Int64 { + var d atomic.Int64 + d.Store(int64(10 * time.Second)) + return d + }() ) -func PersistInterval(d time.Duration) { - if started.Value() == 1 { - panic("cron.PersistInterval: cron already started") - } - - persistInterval = d - Tasks = append(Tasks, Task{ - Desc: "persist hits", - Fun: PersistAndStat, - Period: d, - }) +func SetPersistInterval(d time.Duration) { + persistInterval.Store(int64(d)) } -// RunBackground runs tasks in the background according to the given schedule. -func RunBackground(ctx context.Context) { +// Start running tasks in the background. +func Start(ctx context.Context) { + if started.Value() == 1 { + return + } started.Set(1) l := zlog.Module("cron") - // TODO: should rewrite cron to always respond to channels, and then have - // the cron package send those periodically. - go func() { - for { - <-goatcounter.PersistRunner.Run - bgrun.Run("cron:PersistAndStat", func() { - done := timeout("PersistAndStat", persistInterval) - err := PersistAndStat(ctx) - if err != nil { - l.Error(err) - } - done <- struct{}{} - }) - } - }() + for _, t := range Tasks { + t := t + f := t.ID() + bgrun.NewTask("cron:"+f, 1, func(context.Context) error { + err := t.Fun(ctx) + if err != nil { + l.Error(err) + } + return nil + }) + } for _, t := range Tasks { go func(t Task) { defer zlog.Recover() + id := t.ID() for { - time.Sleep(t.Period) + if id == "persistAndStat" { + time.Sleep(time.Duration(persistInterval.Load())) + } else { + time.Sleep(t.Period) + } if stopped.Value() == 1 { return } - f := t.ID() - bgrun.Run("cron:"+f, func() { - done := timeout(f, persistInterval) - err := t.Fun(ctx) - if err != nil { - l.Error(err) - } - done <- struct{}{} - }) + err := bgrun.RunTask("cron:" + id) + if err != nil { + zlog.Error(err) + } } }(t) } } -func Stop(ctx context.Context) error { +func Stop() error { stopped.Set(1) started.Set(0) - return bgrun.Wait(ctx) + bgrun.Wait("") + bgrun.Reset() + return nil } -func timeout(f string, d time.Duration) chan struct{} { - done := make(chan struct{}) - go func() { - t := time.NewTimer(d) - select { - case <-t.C: - zlog.Errorf("cron task %s is taking longer than %s", f, d) - case <-done: - t.Stop() - } - }() - return done -} +func TaskOldExports() error { return bgrun.RunTask("cron:oldExports") } +func TaskDataRetention() error { return bgrun.RunTask("cron:dataRetention") } +func TaskVacuumOldSites() error { return bgrun.RunTask("cron:vacuumDeleted") } +func TaskACME() error { return bgrun.RunTask("cron:renewACME") } +func TaskSessions() error { return bgrun.RunTask("cron:sessions") } +func TaskEmailReports() error { return bgrun.RunTask("cron:emailReports") } +func TaskPersistAndStat() error { return bgrun.RunTask("cron:persistAndStat") } +func WaitOldExports() { bgrun.Wait("cron:oldExports") } +func WaitDataRetention() { bgrun.Wait("cron:dataRetention") } +func WaitVacuumOldSites() { bgrun.Wait("cron:vacuumDeleted") } +func WaitACME() { bgrun.Wait("cron:renewACME") } +func WaitSessions() { bgrun.Wait("cron:sessions") } +func WaitEmailReports() { bgrun.Wait("cron:emailReports") } +func WaitPersistAndStat() { bgrun.Wait("cron:persistAndStat") } diff --git a/cron/email_reports.go b/cron/email_reports.go index ae15e2820..60a7d587d 100644 --- a/cron/email_reports.go +++ b/cron/email_reports.go @@ -25,10 +25,10 @@ import ( var el = zlog.Module("email-report") // EmailReports sends email reports for sites that have this configured. -func EmailReports(ctx context.Context) error { +func emailReports(ctx context.Context) error { users, err := reportUsers(ctx) if err != nil { - return errors.Errorf("cron.EmailReports: %w", err) + return errors.Errorf("cron.emailReports: %w", err) } now := ztime.Now().UTC() @@ -41,10 +41,10 @@ func EmailReports(ctx context.Context) error { } if user.LastReportAt.IsZero() { - return fmt.Errorf("cron.EmailReports: user=%d: LastReportAt is zero; this should never happen", user.ID) + return fmt.Errorf("cron.emailReports: user=%d: LastReportAt is zero; this should never happen", user.ID) } if user.LastReportAt.After(now) { - return fmt.Errorf("cron.EmailReports: user=%d: LastReportAt is after the current time; this should never happen", user.ID) + return fmt.Errorf("cron.emailReports: user=%d: LastReportAt is after the current time; this should never happen", user.ID) } rng := user.EmailReportRange().UTC() @@ -54,7 +54,7 @@ func EmailReports(ctx context.Context) error { text, html, subject, err := reportText(ctx, site, user) if err != nil { - return fmt.Errorf("cron.EmailReports: user=%d: %w", user.ID, err) + return fmt.Errorf("cron.emailReports: user=%d: %w", user.ID, err) } if text == nil { el.Debug("no text: bailing") diff --git a/cron/email_reports_test.go b/cron/email_reports_test.go index 4e12d10e0..ebaf05235 100644 --- a/cron/email_reports_test.go +++ b/cron/email_reports_test.go @@ -145,10 +145,11 @@ func TestEmailReports(t *testing.T) { buf := new(bytes.Buffer) blackmail.DefaultMailer = blackmail.NewMailer(blackmail.ConnectWriter, blackmail.MailerOut(buf)) - err := cron.EmailReports(ctx) + err := cron.TaskEmailReports() if err != nil { t.Fatal(err) } + cron.WaitEmailReports() if tt.want == "" { if buf.String() != "" { diff --git a/cron/tasks.go b/cron/tasks.go index 8fe483222..c5f1963d9 100644 --- a/cron/tasks.go +++ b/cron/tasks.go @@ -9,13 +9,11 @@ import ( "fmt" "os" "strings" - "sync" "time" "zgo.at/errors" "zgo.at/goatcounter/v2" "zgo.at/goatcounter/v2/acme" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/zdb" "zgo.at/zlog" "zgo.at/zstd/ztime" @@ -57,7 +55,7 @@ func oldExports(ctx context.Context) error { return nil } -func DataRetention(ctx context.Context) error { +func dataRetention(ctx context.Context) error { var sites goatcounter.Sites err := sites.UnscopedList(ctx) if err != nil { @@ -78,32 +76,9 @@ func DataRetention(ctx context.Context) error { return nil } -type lastMemstore struct { - mu sync.Mutex - t time.Time -} - -func (l *lastMemstore) Get() time.Time { - l.mu.Lock() - defer l.mu.Unlock() - return l.t -} - -func (l *lastMemstore) Set(t time.Time) { - l.mu.Lock() - defer l.mu.Unlock() - l.t = t -} - -var LastMemstore = func() *lastMemstore { - l := &lastMemstore{} - l.Set(ztime.Now()) - return l -}() - -func PersistAndStat(ctx context.Context) error { +func persistAndStat(ctx context.Context) error { l := zlog.Module("cron") - l.Debug("PersistAndStat started") + l.Debug("persistAndStat started") hits, err := goatcounter.Memstore.Persist(ctx) if err != nil { @@ -133,10 +108,12 @@ func PersistAndStat(ctx context.Context) error { if len(hits) > 0 { l.Since("stats").FieldsSince().Debugf("persisted %d hits", len(hits)) } - LastMemstore.Set(ztime.Now()) return err } +// UpdateStats updates all the stats tables. +// +// Exported for tests. func UpdateStats(ctx context.Context, site *goatcounter.Site, siteID int64, hits []goatcounter.Hit) error { if site == nil { site = new(goatcounter.Site) @@ -192,20 +169,17 @@ func renewACME(ctx context.Context) error { } for _, s := range sites { - func(ctx context.Context, s goatcounter.Site) { - bgrun.Run("renewACME:"+*s.Cname, func() { - err := acme.Make(ctx, *s.Cname) - if err != nil { - zlog.Module("cron-acme").Error(err) - return - } + err := acme.Make(ctx, *s.Cname) + if err != nil { + zlog.Module("cron-acme").Field("cname", *s.Cname).Error(err) + continue + } - err = s.UpdateCnameSetupAt(ctx) - if err != nil { - zlog.Module("cron-acme").Error(err) - } - }) - }(ctx, s) + err = s.UpdateCnameSetupAt(ctx) + if err != nil { + zlog.Module("cron-acme").Field("cname", *s.Cname).Error(err) + continue + } } return nil diff --git a/cron/tasks_test.go b/cron/tasks_test.go index fe70191c7..ed1e89af0 100644 --- a/cron/tasks_test.go +++ b/cron/tasks_test.go @@ -36,10 +36,11 @@ func TestDataRetention(t *testing.T) { {Site: site.ID, CreatedAt: past, Path: "/a", FirstVisit: zbool.Bool(false)}, }...) - err = cron.DataRetention(ctx) + err = cron.TaskDataRetention() if err != nil { t.Fatal(err) } + cron.WaitDataRetention() var hits goatcounter.Hits err = hits.TestList(ctx, false) diff --git a/export_test.go b/export_test.go index 1bc2320e0..671d60fe0 100644 --- a/export_test.go +++ b/export_test.go @@ -13,7 +13,6 @@ import ( "zgo.at/blackmail" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/cron" "zgo.at/goatcounter/v2/gctest" "zgo.at/zdb" "zgo.at/zstd/zjson" @@ -129,7 +128,6 @@ func TestExport(t *testing.T) { } defer gzfp.Close() - cron.RunBackground(ctx) goatcounter.Import(ctx, gzfp, true, false, func(hit goatcounter.Hit, final bool) { if !final { goatcounter.Memstore.Append(hit) diff --git a/gctest/gctest.go b/gctest/gctest.go index 4482270c0..a85bc8274 100644 --- a/gctest/gctest.go +++ b/gctest/gctest.go @@ -95,9 +95,11 @@ func db(t testing.TB, storeFile bool) context.Context { ctx := Context(db) goatcounter.Memstore.TestInit(db) ctx = initData(ctx, db, t) + cron.Start(ctx) t.Cleanup(func() { goatcounter.Memstore.Reset() + cron.Stop() db.Close() if db.SQLDialect() == zdb.DialectPostgreSQL { exec.Command("dropdb", dbname).Run() diff --git a/go.mod b/go.mod index 42a56872c..feacd7520 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module zgo.at/goatcounter/v2 go 1.19 -replace zgo.at/bgrun => ../Golib/bgrun - require ( code.soquee.net/otp v0.0.4 github.com/BurntSushi/toml v1.2.1 @@ -24,6 +22,7 @@ require ( golang.org/x/net v0.2.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.4.0 + zgo.at/bgrun v0.0.0-00010101000000-000000000000 zgo.at/blackmail v0.0.0-20221021025740-b3fdfc32a1aa zgo.at/errors v1.1.0 zgo.at/follow v0.0.0-20221021024812-dd647d64b369 @@ -39,13 +38,16 @@ require ( zgo.at/zdb v0.0.0-20221105060706-d862504dc723 zgo.at/zhttp v0.0.0-20221020124111-c3011d53484c zgo.at/zli v0.0.0-20221012220610-d6a5a841b943 - zgo.at/zlog v0.0.0-20211008102840-46c1167bf2a9 + zgo.at/zlog v0.0.0-20211017235224-dd4772ddf860 zgo.at/zprof v0.0.0-20211217104121-c3c12596d8f0 zgo.at/zstd v0.0.0-20221107175747-9855306d942d zgo.at/ztpl v0.0.0-20221020022020-7d727686f6a2 zgo.at/zvalidate v0.0.0-20221021025449-cb54fa8efade ) +// Need to finish this and put it in its own repo. +replace zgo.at/bgrun => ./bgrun + // https://github.com/oschwald/maxminddb-golang/pull/75 replace github.com/oschwald/maxminddb-golang => github.com/arp242/maxminddb-golang v1.8.1-0.20221021031716-eb1bbbb3fc5d diff --git a/go.sum b/go.sum index 21b76e7c1..9042f9abf 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,8 @@ zgo.at/zhttp v0.0.0-20221020124111-c3011d53484c h1:1vPtmRRqkrqLGmjiCKm42rLnpQAdX zgo.at/zhttp v0.0.0-20221020124111-c3011d53484c/go.mod h1:ZAV61eMVPCk9Dk9w+k+OS4gqjlCZEdHqOJeSewLzpd0= zgo.at/zli v0.0.0-20221012220610-d6a5a841b943 h1:PeTHjJK0YmNdkwfglLXufJWEwjy2rGujj5DRWtDw0u8= zgo.at/zli v0.0.0-20221012220610-d6a5a841b943/go.mod h1:6nYv5thziHp9CMVzGbQV4qeA9hMJSj7uJ8WLgD4BtVs= -zgo.at/zlog v0.0.0-20211008102840-46c1167bf2a9 h1:4Ot1ETCZQjEjISFiE+SjhwykNvUTLJfyK/BCoJJidnk= -zgo.at/zlog v0.0.0-20211008102840-46c1167bf2a9/go.mod h1:2Pa5cqKA/6S9FrMFeNowtHPd7JEzqwooLXcxU6Wk7Hc= +zgo.at/zlog v0.0.0-20211017235224-dd4772ddf860 h1:7n74jp98CwBdqGCqJoVv2+XygJ3yD43GPUivnj/RPwo= +zgo.at/zlog v0.0.0-20211017235224-dd4772ddf860/go.mod h1:2Pa5cqKA/6S9FrMFeNowtHPd7JEzqwooLXcxU6Wk7Hc= zgo.at/zprof v0.0.0-20211217104121-c3c12596d8f0 h1:nUshmGDnI3+N1OeU265xaqR6weTN2xgrqls8nrkGzbA= zgo.at/zprof v0.0.0-20211217104121-c3c12596d8f0/go.mod h1:JqClLxeT9Uui3apqyN3U6KryFanocqM7E3X4Gle2FAU= zgo.at/zstd v0.0.0-20221013104704-16fa49fadc62/go.mod h1:o/Q8+EtSahHnfkbB3t8wXE0FnoDTmJ0sBDlzezv9XeM= diff --git a/handlers/api.go b/handlers/api.go index acf619471..e4d0be951 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -20,9 +20,9 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "golang.org/x/exp/slices" + "zgo.at/bgrun" "zgo.at/errors" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/goatcounter/v2/cron" "zgo.at/goatcounter/v2/metrics" "zgo.at/guru" @@ -69,11 +69,6 @@ func (h api) mount(r chi.Router, db zdb.DB) { case "/api/v0/export": return rateLimits.export(r) case "/api/v0/count": - // Memstore is taking a while to persist; don't add to it. - l := ztime.Now().Sub(cron.LastMemstore.Get()) - if l > 20*time.Second { - return 0, 5 - } return rateLimits.apiCount(r) } }, @@ -319,7 +314,7 @@ func (h api) export(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("export api:%d", export.SiteID), func() { export.Run(ctx, fp, false) }) + bgrun.MustRunFunction(fmt.Sprintf("export api:%d", export.SiteID), func() { export.Run(ctx, fp, false) }) w.WriteHeader(http.StatusAccepted) return zhttp.JSON(w, export) @@ -613,7 +608,11 @@ func (h api) count(w http.ResponseWriter, r *http.Request) error { } if goatcounter.Memstore.Len() >= 5000 { - goatcounter.PersistRunner.Run <- struct{}{} + err := cron.TaskPersistAndStat() + if err != nil { + zlog.Error(err) + } + cron.WaitPersistAndStat() } if firstHitAt != nil { diff --git a/handlers/bosmang.go b/handlers/bosmang.go index 52971c654..9a11b8473 100644 --- a/handlers/bosmang.go +++ b/handlers/bosmang.go @@ -9,8 +9,8 @@ import ( "time" "github.com/go-chi/chi/v5" + "zgo.at/bgrun" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/goatcounter/v2/cron" "zgo.at/goatcounter/v2/metrics" "zgo.at/guru" @@ -18,6 +18,7 @@ import ( "zgo.at/zhttp" "zgo.at/zhttp/auth" "zgo.at/zhttp/mware" + "zgo.at/zlog" "zgo.at/zprof" "zgo.at/zstd/zcontext" "zgo.at/zstd/znet" @@ -56,17 +57,17 @@ func (h bosmang) cache(w http.ResponseWriter, r *http.Request) error { } func (h bosmang) bgrun(w http.ResponseWriter, r *http.Request) error { - hist := bgrun.History() + hist := bgrun.History(0) metrics := make(map[string]ztime.Durations) for _, h := range hist { - x, ok := metrics[h.Name] + x, ok := metrics[h.Task] if !ok { x = ztime.NewDurations(0) x.Grow(32) } - x.Append(h.Finished.Sub(h.Started)) - metrics[h.Name] = x + x.Append(h.Took) + metrics[h.Task] = x } return zhttp.Template(w, "bosmang_bgrun.gohtml", struct { @@ -75,7 +76,7 @@ func (h bosmang) bgrun(w http.ResponseWriter, r *http.Request) error { Jobs []bgrun.Job History []bgrun.Job Metrics map[string]ztime.Durations - }{newGlobals(w, r), cron.Tasks, bgrun.List(), hist, metrics}) + }{newGlobals(w, r), cron.Tasks, bgrun.Running(), hist, metrics}) } func (h bosmang) runTask(w http.ResponseWriter, r *http.Request) error { @@ -88,8 +89,11 @@ func (h bosmang) runTask(w http.ResponseWriter, r *http.Request) error { t := cron.Tasks[taskID] id := t.ID() - bgrun.Run("manual:"+id, func() { - t.Fun(zcontext.WithoutTimeout(r.Context())) + bgrun.RunFunction("manual:"+id, func() { + err := t.Fun(zcontext.WithoutTimeout(r.Context())) + if err != nil { + zlog.Error(err) + } }) zhttp.Flash(w, "Task %q started", id) diff --git a/handlers/mw.go b/handlers/mw.go index 779dfac2b..29d47faaf 100644 --- a/handlers/mw.go +++ b/handlers/mw.go @@ -14,7 +14,6 @@ import ( "time" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/cron" "zgo.at/guru" "zgo.at/json" "zgo.at/termtext" @@ -104,15 +103,14 @@ func addctx(db zdb.DB, loadSite bool, dashTimeout int) func(http.Handler) http.H if r.URL.Path == "/status" { info, _ := zdb.Info(ctx) j, err := json.Marshal(map[string]interface{}{ - "uptime": ztime.Now().Sub(Started).Round(time.Second).String(), - "version": goatcounter.Version, - "last_persisted_at": cron.LastMemstore.Get().Round(time.Second).Format(time.RFC3339), - "database": zdb.SQLDialect(ctx).String() + " " + string(info.Version), - "go": runtime.Version(), - "GOOS": runtime.GOOS, - "GOARCH": runtime.GOARCH, - "race": zruntime.Race, - "cgo": zruntime.CGO, + "uptime": ztime.Now().Sub(Started).Round(time.Second).String(), + "version": goatcounter.Version, + "database": zdb.SQLDialect(ctx).String() + " " + string(info.Version), + "go": runtime.Version(), + "GOOS": runtime.GOOS, + "GOARCH": runtime.GOARCH, + "race": zruntime.Race, + "cgo": zruntime.CGO, }) if err != nil { http.Error(w, err.Error(), 500) diff --git a/handlers/settings.go b/handlers/settings.go index 8ab3bc73f..e342a17a9 100644 --- a/handlers/settings.go +++ b/handlers/settings.go @@ -19,11 +19,11 @@ import ( "github.com/go-chi/chi/v5" "github.com/monoculum/formam/v3" "golang.org/x/exp/slices" + "zgo.at/bgrun" "zgo.at/blackmail" "zgo.at/errors" "zgo.at/goatcounter/v2" "zgo.at/goatcounter/v2/acme" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/goatcounter/v2/cron" "zgo.at/guru" "zgo.at/zdb" @@ -194,7 +194,7 @@ func (h settings) mainSave(w http.ResponseWriter, r *http.Request) error { if makecert { ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("acme.Make:%s", args.Cname), func() { + bgrun.RunFunction(fmt.Sprintf("acme.Make:%s", args.Cname), func() { err := acme.Make(ctx, args.Cname) if err != nil { zlog.Field("domain", args.Cname).Error(err) @@ -477,7 +477,7 @@ func (h settings) purgeDo(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("purge:%d", Site(ctx).ID), func() { + bgrun.RunFunction(fmt.Sprintf("purge:%d", Site(ctx).ID), func() { var list goatcounter.Hits err := list.Purge(ctx, paths) if err != nil { @@ -502,7 +502,7 @@ func (h settings) merge(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("merge:%d", Site(ctx).ID), func() { + bgrun.RunFunction(fmt.Sprintf("merge:%d", Site(ctx).ID), func() { var list goatcounter.Hits err := list.Merge(ctx, dst, paths) if err != nil { @@ -591,7 +591,7 @@ func (h settings) exportImport(w http.ResponseWriter, r *http.Request) error { user := User(r.Context()) ctx := goatcounter.CopyContextValues(r.Context()) n := 0 - bgrun.Run(fmt.Sprintf("import:%d", Site(ctx).ID), func() { + bgrun.RunFunction(fmt.Sprintf("import:%d", Site(ctx).ID), func() { firstHitAt, err := goatcounter.Import(ctx, fp, replace, true, func(hit goatcounter.Hit, final bool) { if final { return @@ -602,10 +602,11 @@ func (h settings) exportImport(w http.ResponseWriter, r *http.Request) error { // Spread out the load a bit. if n%5000 == 0 { - goatcounter.PersistRunner.Run <- struct{}{} - for bgrun.Running("cron:PersistAndStat") { - time.Sleep(250 * time.Millisecond) + err := cron.TaskPersistAndStat() + if err != nil { + zlog.Error(err) } + cron.WaitPersistAndStat() } }) if err != nil { @@ -650,7 +651,7 @@ func (h settings) exportStart(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("export web:%d", Site(ctx).ID), + bgrun.RunFunction(fmt.Sprintf("export web:%d", Site(ctx).ID), func() { export.Run(ctx, fp, true) }) zhttp.Flash(w, T(r.Context(), "notify/export-started-in-background|Export started in the background; you’ll get an email with a download link when it’s done.")) @@ -692,7 +693,7 @@ func (h settings) deleteDo(w http.ResponseWriter, r *http.Request) error { account := Account(r.Context()) if args.Reason != "" { - bgrun.Run("email:deletion", func() { + bgrun.RunFunction("email:deletion", func() { contact := "false" if args.ContactMe { u := goatcounter.GetUser(r.Context()) @@ -815,7 +816,7 @@ func (h settings) usersAdd(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run(fmt.Sprintf("adduser:%d", newUser.ID), func() { + bgrun.RunFunction(fmt.Sprintf("adduser:%d", newUser.ID), func() { err := blackmail.Send(fmt.Sprintf("A GoatCounter account was created for you at %s", account.Display(ctx)), blackmail.From("GoatCounter", goatcounter.Config(r.Context()).EmailFrom), blackmail.To(newUser.Email), @@ -920,19 +921,17 @@ func (h settings) bosmang(w http.ResponseWriter, r *http.Request) error { info, _ := zdb.Info(r.Context()) return zhttp.Template(w, "settings_server.gohtml", struct { Globals - Uptime string - Version string - LastPersistedAt string - Database string - Go string - GOOS string - GOARCH string - Race bool - Cgo bool + Uptime string + Version string + Database string + Go string + GOOS string + GOARCH string + Race bool + Cgo bool }{newGlobals(w, r), ztime.Now().Sub(Started).Round(time.Second).String(), goatcounter.Version, - cron.LastMemstore.Get().Round(time.Second).Format(time.RFC3339), zdb.SQLDialect(r.Context()).String() + " " + string(info.Version), runtime.Version(), runtime.GOOS, diff --git a/handlers/settings_test.go b/handlers/settings_test.go index 3340bedd7..c9785cfdc 100644 --- a/handlers/settings_test.go +++ b/handlers/settings_test.go @@ -11,8 +11,8 @@ import ( "testing" "time" + "zgo.at/bgrun" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/goatcounter/v2/gctest" "zgo.at/zdb" "zgo.at/zstd/ztype" @@ -83,7 +83,7 @@ func TestSettingsPurge(t *testing.T) { for _, tt := range tests { runTest(t, tt, func(t *testing.T, rr *httptest.ResponseRecorder, r *http.Request) { - bgrun.Wait(context.Background()) + bgrun.Wait("") var hits goatcounter.Hits err := hits.TestList(r.Context(), false) diff --git a/handlers/user.go b/handlers/user.go index 1816de470..221fafcba 100644 --- a/handlers/user.go +++ b/handlers/user.go @@ -19,9 +19,9 @@ import ( "github.com/go-chi/chi/v5" "golang.org/x/crypto/bcrypt" "golang.org/x/net/xsrftoken" + "zgo.at/bgrun" "zgo.at/blackmail" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/guru" "zgo.at/zdb" "zgo.at/zhttp" @@ -130,7 +130,7 @@ func (h user) requestReset(w http.ResponseWriter, r *http.Request) error { site := Site(r.Context()) ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run("email:password", func() { + bgrun.RunFunction("email:password", func() { err := blackmail.Send( T(ctx, "email/reset-user-email-subject|Password reset for %(domain)", site.Domain(ctx)), blackmail.From("GoatCounter login", goatcounter.Config(ctx).EmailFrom), @@ -497,7 +497,7 @@ func (h user) deleteAPIToken(w http.ResponseWriter, r *http.Request) error { func sendEmailVerify(ctx context.Context, site *goatcounter.Site, user *goatcounter.User, emailFrom string) { ctx = goatcounter.CopyContextValues(ctx) - bgrun.Run("email:verify", func() { + bgrun.RunFunction("email:verify", func() { err := blackmail.Send("Verify your email", mail.Address{Name: "GoatCounter", Address: emailFrom}, blackmail.To(user.Email), diff --git a/handlers/website.go b/handlers/website.go index c050207c3..1d89c9cf3 100644 --- a/handlers/website.go +++ b/handlers/website.go @@ -18,10 +18,10 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "zgo.at/bgrun" "zgo.at/blackmail" "zgo.at/errors" "zgo.at/goatcounter/v2" - "zgo.at/goatcounter/v2/bgrun" "zgo.at/guru" "zgo.at/tz" "zgo.at/zdb" @@ -383,7 +383,7 @@ func (h website) doSignup(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run("welcome email", func() { + bgrun.RunFunction("welcome email", func() { err := blackmail.Send("Welcome to GoatCounter!", blackmail.From("GoatCounter", goatcounter.Config(r.Context()).EmailFrom), blackmail.To(user.Email), @@ -458,7 +458,7 @@ func (h website) doForgot(w http.ResponseWriter, r *http.Request) error { } ctx := goatcounter.CopyContextValues(r.Context()) - bgrun.Run("email:sites", func() { + bgrun.RunFunction("email:sites", func() { defer zlog.Recover() err := blackmail.Send("Your GoatCounter sites", mail.Address{Name: "GoatCounter", Address: goatcounter.Config(ctx).EmailFrom}, diff --git a/memstore.go b/memstore.go index 5e29142b1..ebadbbd51 100644 --- a/memstore.go +++ b/memstore.go @@ -41,13 +41,6 @@ var ( _ encoding.TextUnmarshaler = &hash{} ) -// PersistRunner can be used to signal the cron package to run the -// PeristAndStat() function. We can't use a direct function call due to circular -// imports. -var PersistRunner = struct { - Run chan struct{} -}{make(chan struct{})} - // MarshalText converts the data to a human readable representation. func (h hash) MarshalText() ([]byte, error) { b := base64.StdEncoding.EncodeToString([]byte(h.v)) diff --git a/tpl/bosmang_bgrun.gohtml b/tpl/bosmang_bgrun.gohtml index 2895c17cc..aa15096b7 100644 --- a/tpl/bosmang_bgrun.gohtml +++ b/tpl/bosmang_bgrun.gohtml @@ -30,15 +30,13 @@ Job Started from - No duplicates Started at {{range $j := .Jobs}} - {{$j.Name}} + {{$j.Task}} {{$j.From}} - {{$j.NoDuplicates}} {{$j.Started | ago}} ago {{else}} @@ -74,11 +72,10 @@ {{range $i, $_ := .History}} {{$j := index $.History (sub (len $.History) $i 1)}} - {{$j.Name}} + {{$j.Task}} {{$j.From}} - {{$j.NoDuplicates}} {{$j.Started.Format "2006-01-02 15:04:05"}} - {{round_duration ($j.Finished.Sub $j.Started)}} + {{round_duration $j.Took}} {{else}} No history. diff --git a/tpl/settings_server.gohtml b/tpl/settings_server.gohtml index a52da4dc3..e7ca60772 100644 --- a/tpl/settings_server.gohtml +++ b/tpl/settings_server.gohtml @@ -4,11 +4,10 @@

Server management

-Version:            {{.Version}}
-Go:                 {{.Go}} {{.GOOS}}/{{.GOARCH}} (race={{.Race}} cgo={{.Cgo}})
-Database:           {{.Database}}
-Uptime:             {{.Uptime}}
-Last persisted at:  {{.LastPersistedAt}}
+Version:   {{.Version}}
+Go:        {{.Go}} {{.GOOS}}/{{.GOARCH}} (race={{.Race}} cgo={{.Cgo}})
+Database:  {{.Database}}
+Uptime:    {{.Uptime}}