Skip to content

Commit 33d2698

Browse files
committed
Ensure batch callbacks aren't run unexpectedly and fully document new behavior
1 parent 663debd commit 33d2698

File tree

3 files changed

+84
-15
lines changed

3 files changed

+84
-15
lines changed

README.md

+35-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,41 @@ Or install it yourself as:
3232

3333
## Usage
3434

35-
Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
35+
Sidekiq Batch is MOSTLY a drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
36+
37+
## Caveats/Gotchas
38+
39+
Consider the following workflow:
40+
41+
* Batch Z created
42+
* Worker A queued in batch Z
43+
* Worker A starts Worker B in batch Z
44+
* Worker B completes *before* worker A does
45+
* Worker A completes
46+
47+
In the standard configuration, the `on(:success)` and `on(:complete)` callbacks will be triggered when Worker B completes.
48+
This configuration is the default, simply for legacy reasons. This gem adds the following option to the sidekiq.yml options:
49+
50+
```yaml
51+
:batch_push_interval: 0
52+
```
53+
54+
When this value is *absent* (aka legacy), Worker A will only push the increment of batch jobs (aka Worker B) *when it completes*
55+
56+
When this value is set to `0`, Worker A will increment the count as soon as `WorkerB.perform_async` is called
57+
58+
When this value is a positive number, Worker A will wait a maximum of value-seconds before pushing the increment to redis, or until it's done, whichever comes first.
59+
60+
This comes into play if Worker A is queueing thousands of WorkerB jobs, or has some other reason for WorkerB to complete beforehand.
61+
62+
If you are queueing many WorkerB jobs, it is recommended to set this value to something like `3` to avoid thousands of calls to redis, and call WorkerB like so:
63+
```ruby
64+
WorkerB.perform_in(4.seconds, some, args)
65+
```
66+
this will ensure that the batch callback does not get triggered until WorkerA *and* the last WorkerB job complete.
67+
68+
If WorkerA is just slow for whatever reason, setting to `0` will update the batch status immediately so that the callbacks don't fire.
69+
3670

3771
## Contributing
3872

lib/sidekiq/batch.rb

+48-13
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ def initialize(existing_bid = nil)
2020
@initialized = false
2121
@created_at = Time.now.utc.to_f
2222
@bidkey = "BID-" + @bid.to_s
23-
@ready_to_queue = []
23+
@queued_jids = []
24+
@pending_jids = []
25+
@incremental_push = Sidekiq.options.keys.include?(:batch_push_interval)
26+
@batch_push_interval = Sidekiq.options[:batch_push_interval]
2427
end
2528

2629
def description=(description)
@@ -59,20 +62,24 @@ def jobs
5962

6063
begin
6164
if !@existing && !@initialized
62-
parent_bid = Thread.current[:batch].bid if Thread.current[:batch]
65+
parent_bid, Thread.current[:parent_id] = Thread.current[:batch].bid if Thread.current[:batch]
6366

6467
Sidekiq.redis do |r|
6568
r.multi do |pipeline|
6669
pipeline.hset(@bidkey, "created_at", @created_at)
67-
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
6870
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
71+
if parent_bid
72+
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s)
73+
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
74+
end
6975
end
7076
end
7177

7278
@initialized = true
7379
end
7480

75-
@ready_to_queue = []
81+
@queued_jids = []
82+
@pending_jids = []
7683

7784
begin
7885
parent = Thread.current[:batch]
@@ -81,34 +88,62 @@ def jobs
8188
ensure
8289
Thread.current[:batch] = parent
8390
end
84-
85-
return [] if @ready_to_queue.size == 0
91+
conditional_redis_increment!(true)
92+
return [] if @queued_jids.size == 0
8693

8794
Sidekiq.redis do |r|
8895
r.multi do |pipeline|
8996
if parent_bid
90-
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
91-
pipeline.hincrby("BID-#{parent_bid}", "total", @ready_to_queue.size)
9297
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
9398
end
9499

95-
pipeline.hincrby(@bidkey, "pending", @ready_to_queue.size)
96-
pipeline.hincrby(@bidkey, "total", @ready_to_queue.size)
97100
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
98101

99-
pipeline.sadd(@bidkey + "-jids", [@ready_to_queue])
102+
pipeline.sadd(@bidkey + "-jids", [@queued_jids])
100103
pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
101104
end
102105
end
103106

104-
@ready_to_queue
107+
@queued_jids
105108
ensure
106109
Thread.current[:bid_data] = bid_data
107110
end
108111
end
109112

110113
def increment_job_queue(jid)
111-
@ready_to_queue << jid
114+
@queued_jids << jid
115+
@pending_jids << jid
116+
conditional_redis_increment!
117+
end
118+
119+
def conditional_redis_increment!(force=false)
120+
if should_increment? || force
121+
parent_bid = Thread.current[:parent_id]
122+
Sidekiq.redis do |r|
123+
r.multi do |pipeline|
124+
if parent_bid
125+
pipeline.hincrby("BID-#{parent_bid}", "total", @pending_jids.length)
126+
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
127+
end
128+
129+
pipeline.hincrby(@bidkey, "pending", @pending_jids.length)
130+
pipeline.hincrby(@bidkey, "total", @pending_jids.length)
131+
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
132+
end
133+
end
134+
@pending_jids = []
135+
end
136+
end
137+
138+
def should_increment?
139+
return false unless @incremental_push
140+
return true if @batch_push_interval == 0 || @queued_jids.length == 1
141+
142+
@last_increment ||= Time.now.to_f
143+
if @last_increment + @batch_push_interval > Time.now.to_f
144+
@last_increment = Time.now.to_f
145+
return true
146+
end
112147
end
113148

114149
def invalidate_all

sidekiq-batch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Gem::Specification.new do |spec|
1919
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
2020
spec.require_paths = ["lib"]
2121

22-
spec.add_dependency "sidekiq", ">= 3"
22+
spec.add_dependency "sidekiq", ">= 3", "<7"
2323

2424
spec.add_development_dependency "bundler", "~> 2.1"
2525
spec.add_development_dependency "rake", "~> 13.0"

0 commit comments

Comments
 (0)