Skip to content

Commit 10248b8

Browse files
committed
Add shard_selection_lambda
1 parent 094eba1 commit 10248b8

File tree

3 files changed

+46
-6
lines changed

3 files changed

+46
-6
lines changed

lib/active_job/queue_adapters/solid_queue_adapter.rb

+8-5
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,26 @@ def enqueue_after_transaction_commit?
1717
end
1818

1919
def enqueue(active_job) # :nodoc:
20-
select_shard { SolidQueue::Job.enqueue(active_job) }
20+
select_shard(active_job:) { SolidQueue::Job.enqueue(active_job) }
2121
end
2222

2323
def enqueue_at(active_job, timestamp) # :nodoc:
24-
select_shard do
24+
select_shard(active_job:) do
2525
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
2626
end
2727
end
2828

2929
def enqueue_all(active_jobs) # :nodoc:
30-
select_shard { SolidQueue::Job.enqueue_all(active_jobs) }
30+
select_shard(active_jobs:) { SolidQueue::Job.enqueue_all(active_jobs) }
3131
end
3232

3333
private
3434

35-
def select_shard(&block)
36-
shard = @db_shard || SolidQueue.primary_shard
35+
def select_shard(active_job: nil, active_jobs: nil, &block)
36+
shard =
37+
SolidQueue.shard_selection_lambda&.call(active_job:, active_jobs:) ||
38+
@db_shard ||
39+
SolidQueue.primary_shard
3740

3841
if shard
3942
ActiveRecord::Base.connected_to(shard: shard) { block.call }

lib/solid_queue.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ module SolidQueue
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
4343

44-
mattr_accessor :primary_shard, :active_shard
44+
mattr_accessor :primary_shard, :active_shard, :shard_selection_lambda
4545

4646
delegate :on_start, :on_stop, to: Supervisor
4747

test/unit/multisharding_test.rb

+37
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ class MultishardingTest < ActiveSupport::TestCase
1616
end
1717
end
1818

19+
test "shard_selection_lambda can override which shard is used to enqueue individual jobs" do
20+
shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_job.arguments.first == "hey!" }
21+
22+
with_shard_selection_lambda(shard_selection_lambda) do
23+
assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 1 do
24+
AddToBufferJob.perform_later "hey!"
25+
end
26+
end
27+
end
28+
1929
test "jobs are enqueued for later in the right shard" do
2030
assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do
2131
assert_difference -> { connected_to_shard_two { SolidQueue::ScheduledExecution.count } }, 1 do
@@ -39,4 +49,31 @@ class MultishardingTest < ActiveSupport::TestCase
3949
end
4050
end
4151
end
52+
53+
test "shard_selection_lambda can override which shard is used to enqueue jobs in bulk" do
54+
active_jobs = [
55+
AddToBufferJob.new(2),
56+
ShardTwoJob.new(6),
57+
AddToBufferJob.new(3),
58+
ShardTwoJob.new(7)
59+
]
60+
shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_jobs.size == 2 }
61+
62+
with_shard_selection_lambda(shard_selection_lambda) do
63+
assert_difference -> { SolidQueue::Job.count }, 0 do
64+
assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 4 do
65+
ActiveJob.perform_all_later(active_jobs)
66+
end
67+
end
68+
end
69+
end
70+
71+
private
72+
73+
def with_shard_selection_lambda(lambda, &block)
74+
SolidQueue.shard_selection_lambda = lambda
75+
block.call
76+
ensure
77+
SolidQueue.shard_selection_lambda = nil
78+
end
4279
end

0 commit comments

Comments
 (0)