Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lifecycle hooks to Dispatcher and Scheduler #486

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ def on_worker_stop(...)
Worker.on_stop(...)
end

def on_dispatcher_start(...)
Dispatcher.on_start(...)
end

def on_dispatcher_stop(...)
Dispatcher.on_stop(...)
end

def on_scheduler_start(...)
Scheduler.on_start(...)
end

def on_scheduler_stop(...)
Scheduler.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

module SolidQueue
class Dispatcher < Processes::Poller
include LifecycleHooks
attr_accessor :batch_size, :concurrency_maintenance

after_boot :run_start_hooks
after_boot :start_concurrency_maintenance
before_shutdown :stop_concurrency_maintenance
after_shutdown :run_stop_hooks

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
module SolidQueue
class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_accessor :recurring_schedule

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
before_shutdown :unschedule_recurring_tasks
before_shutdown :run_stop_hooks

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
Expand Down
20 changes: 15 additions & 5 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ])
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }

SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
assert_equal 8, JobResult.count
JobResult.last(8)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
assert_equal({ "hook_called" => 8 }, results.map(&:status).tally)
assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
SolidQueue::Dispatcher.clear_hooks
SolidQueue::Scheduler.clear_hooks
end

test "handle errors on lifecycle hooks" do
Expand All @@ -48,5 +56,7 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
SolidQueue::Dispatcher.clear_hooks
SolidQueue::Scheduler.clear_hooks
end
end
Loading