Skip to content

Commit f815543

Browse files
authored
Merge pull request #1 from jaceksamol/kb-update-fork
Update fork
2 parents f0760c5 + a11bd2c commit f815543

File tree

10 files changed

+129
-36
lines changed

10 files changed

+129
-36
lines changed

.github/FUNDING.yml

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# These are supported funding model platforms
2+
github: breamware
3+
open_collective: sidekiq-batch

.github/workflows/ci.yml

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,30 @@
11
name: CI
22

3-
on: [push, pull_request]
3+
on: [pull_request, workflow_dispatch]
44

55
jobs:
66
test:
7+
env:
8+
REDIS_HOST: 'redis'
79

810
runs-on: ubuntu-latest
11+
services:
12+
redis:
13+
image: redis
14+
# Set health checks to wait until redis has started
15+
options: >-
16+
--health-cmd "redis-cli ping"
17+
--health-interval 10s
18+
--health-timeout 5s
19+
--health-retries 5
20+
ports:
21+
# Maps port 6379 on service container to the host
22+
- 6379:6379
923

1024
strategy:
1125
fail-fast: false
1226
matrix:
13-
ruby: ["2.5", "2.6", "2.7", "3.0", "3.1", ruby-head]
27+
ruby: ["3.0", "3.1", "3.2", ruby-head]
1428

1529
steps:
1630
- uses: actions/checkout@v2

README.md

+35-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,41 @@ Or install it yourself as:
3232

3333
## Usage
3434

35-
Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
35+
Sidekiq Batch is MOSTLY a drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
36+
37+
## Caveats/Gotchas
38+
39+
Consider the following workflow:
40+
41+
* Batch Z created
42+
* Worker A queued in batch Z
43+
* Worker A starts Worker B in batch Z
44+
* Worker B completes *before* worker A does
45+
* Worker A completes
46+
47+
In the standard configuration, the `on(:success)` and `on(:complete)` callbacks will be triggered when Worker B completes.
48+
This configuration is the default, simply for legacy reasons. This gem adds the following option to the sidekiq.yml options:
49+
50+
```yaml
51+
:batch_push_interval: 0
52+
```
53+
54+
When this value is *absent* (aka legacy), Worker A will only push the increment of batch jobs (aka Worker B) *when it completes*
55+
56+
When this value is set to `0`, Worker A will increment the count as soon as `WorkerB.perform_async` is called
57+
58+
When this value is a positive number, Worker A will wait a maximum of value-seconds before pushing the increment to redis, or until it's done, whichever comes first.
59+
60+
This comes into play if Worker A is queueing thousands of WorkerB jobs, or has some other reason for WorkerB to complete beforehand.
61+
62+
If you are queueing many WorkerB jobs, it is recommended to set this value to something like `3` to avoid thousands of calls to redis, and call WorkerB like so:
63+
```ruby
64+
WorkerB.perform_in(4.seconds, some, args)
65+
```
66+
this will ensure that the batch callback does not get triggered until WorkerA *and* the last WorkerB job complete.
67+
68+
If WorkerA is just slow for whatever reason, setting to `0` will update the batch status immediately so that the callbacks don't fire.
69+
3670

3771
## Contributing
3872

lib/sidekiq/batch.rb

+59-20
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ def initialize(existing_bid = nil)
2020
@initialized = false
2121
@created_at = Time.now.utc.to_f
2222
@bidkey = "BID-" + @bid.to_s
23-
@ready_to_queue = []
23+
@queued_jids = []
24+
@pending_jids = []
25+
26+
@incremental_push = !Sidekiq.default_configuration[:batch_push_interval].nil?
27+
@batch_push_interval = Sidekiq.default_configuration[:batch_push_interval]
2428
end
2529

2630
def description=(description)
@@ -43,10 +47,10 @@ def on(event, callback, options = {})
4347
callback_key = "#{@bidkey}-callbacks-#{event}"
4448
Sidekiq.redis do |r|
4549
r.multi do |pipeline|
46-
pipeline.sadd(callback_key, JSON.unparse({
50+
pipeline.sadd(callback_key, [JSON.unparse({
4751
callback: callback,
4852
opts: options
49-
}))
53+
})])
5054
pipeline.expire(callback_key, BID_EXPIRE_TTL)
5155
end
5256
end
@@ -64,51 +68,86 @@ def jobs
6468
Sidekiq.redis do |r|
6569
r.multi do |pipeline|
6670
pipeline.hset(@bidkey, "created_at", @created_at)
67-
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
6871
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
72+
if parent_bid
73+
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s)
74+
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
75+
end
6976
end
7077
end
7178

7279
@initialized = true
7380
end
7481

75-
@ready_to_queue = []
82+
@queued_jids = []
83+
@pending_jids = []
7684

7785
begin
7886
parent = Thread.current[:batch]
7987
Thread.current[:batch] = self
88+
Thread.current[:parent_bid] = parent_bid
8089
yield
8190
ensure
8291
Thread.current[:batch] = parent
92+
Thread.current[:parent_bid] = nil
8393
end
8494

85-
return [] if @ready_to_queue.size == 0
95+
return [] if @queued_jids.size == 0
96+
conditional_redis_increment!(true)
8697

8798
Sidekiq.redis do |r|
8899
r.multi do |pipeline|
89100
if parent_bid
90-
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
91-
pipeline.hincrby("BID-#{parent_bid}", "total", @ready_to_queue.size)
92101
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
93102
end
94103

95-
pipeline.hincrby(@bidkey, "pending", @ready_to_queue.size)
96-
pipeline.hincrby(@bidkey, "total", @ready_to_queue.size)
97104
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
98105

99-
pipeline.sadd(@bidkey + "-jids", @ready_to_queue)
106+
pipeline.sadd(@bidkey + "-jids", @queued_jids)
100107
pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
101108
end
102109
end
103110

104-
@ready_to_queue
111+
@queued_jids
105112
ensure
106113
Thread.current[:bid_data] = bid_data
107114
end
108115
end
109116

110117
def increment_job_queue(jid)
111-
@ready_to_queue << jid
118+
@queued_jids << jid
119+
@pending_jids << jid
120+
conditional_redis_increment!
121+
end
122+
123+
def conditional_redis_increment!(force=false)
124+
if should_increment? || force
125+
parent_bid = Thread.current[:parent_bid]
126+
Sidekiq.redis do |r|
127+
r.multi do |pipeline|
128+
if parent_bid
129+
pipeline.hincrby("BID-#{parent_bid}", "total", @pending_jids.length)
130+
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
131+
end
132+
133+
pipeline.hincrby(@bidkey, "pending", @pending_jids.length)
134+
pipeline.hincrby(@bidkey, "total", @pending_jids.length)
135+
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
136+
end
137+
end
138+
@pending_jids = []
139+
end
140+
end
141+
142+
def should_increment?
143+
return false unless @incremental_push
144+
return true if @batch_push_interval == 0 || @queued_jids.length == 1
145+
now = Time.now.to_f
146+
@last_increment ||= now
147+
if @last_increment + @batch_push_interval > now
148+
@last_increment = now
149+
return true
150+
end
112151
end
113152

114153
def invalidate_all
@@ -130,7 +169,7 @@ def parent
130169
end
131170

132171
def valid?(batch = self)
133-
valid = !Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }
172+
valid = Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }.zero?
134173
batch.parent ? valid && valid?(batch.parent) : valid
135174
end
136175

@@ -149,7 +188,7 @@ class << self
149188
def process_failed_job(bid, jid)
150189
_, pending, failed, children, complete, parent_bid = Sidekiq.redis do |r|
151190
r.multi do |pipeline|
152-
pipeline.sadd("BID-#{bid}-failed", jid)
191+
pipeline.sadd("BID-#{bid}-failed", [jid])
153192

154193
pipeline.hincrby("BID-#{bid}", "pending", 0)
155194
pipeline.scard("BID-#{bid}-failed")
@@ -166,7 +205,7 @@ def process_failed_job(bid, jid)
166205
Sidekiq.redis do |r|
167206
r.multi do |pipeline|
168207
pipeline.hincrby("BID-#{parent_bid}", "pending", 1)
169-
pipeline.sadd("BID-#{parent_bid}-failed", jid)
208+
pipeline.sadd("BID-#{parent_bid}-failed", [jid])
170209
pipeline.expire("BID-#{parent_bid}-failed", BID_EXPIRE_TTL)
171210
end
172211
end
@@ -188,8 +227,8 @@ def process_successful_job(bid, jid)
188227
pipeline.hget("BID-#{bid}", "total")
189228
pipeline.hget("BID-#{bid}", "parent_bid")
190229

191-
pipeline.srem("BID-#{bid}-failed", jid)
192-
pipeline.srem("BID-#{bid}-jids", jid)
230+
pipeline.srem("BID-#{bid}-failed", [jid])
231+
pipeline.srem("BID-#{bid}-jids", [jid])
193232
pipeline.expire("BID-#{bid}", BID_EXPIRE_TTL)
194233
end
195234
end
@@ -209,7 +248,7 @@ def enqueue_callbacks(event, bid)
209248
already_processed, _, callbacks, queue, parent_bid, callback_batch = Sidekiq.redis do |r|
210249
r.multi do |pipeline|
211250
pipeline.hget(batch_key, event_name)
212-
pipeline.hset(batch_key, event_name, true)
251+
pipeline.hset(batch_key, event_name, 'true')
213252
pipeline.smembers(callback_key)
214253
pipeline.hget(batch_key, "callback_queue")
215254
pipeline.hget(batch_key, "parent_bid")
@@ -253,7 +292,7 @@ def enqueue_callbacks(event, bid)
253292
else
254293
# Otherwise finalize in sub batch complete callback
255294
cb_batch = self.new
256-
cb_batch.callback_batch = true
295+
cb_batch.callback_batch = 'true'
257296
Sidekiq.logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"}
258297
cb_batch.on(:complete, "Sidekiq::Batch::Callback::Finalize#dispatch", opts)
259298
cb_batch.jobs do

lib/sidekiq/batch/callback.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ def success(bid, status, parent_bid)
4040

4141
_, _, success, _, complete, pending, children, failure = Sidekiq.redis do |r|
4242
r.multi do |pipeline|
43-
pipeline.sadd("BID-#{parent_bid}-success", bid)
43+
pipeline.sadd("BID-#{parent_bid}-success", [bid])
4444
pipeline.expire("BID-#{parent_bid}-success", Sidekiq::Batch::BID_EXPIRE_TTL)
4545
pipeline.scard("BID-#{parent_bid}-success")
46-
pipeline.sadd("BID-#{parent_bid}-complete", bid)
46+
pipeline.sadd("BID-#{parent_bid}-complete", [bid])
4747
pipeline.scard("BID-#{parent_bid}-complete")
4848
pipeline.hincrby("BID-#{parent_bid}", "pending", 0)
4949
pipeline.hincrby("BID-#{parent_bid}", "children", 0)
@@ -81,7 +81,7 @@ def complete(bid, status, parent_bid)
8181
Sidekiq.logger.debug {"Finalize parent complete bid: #{parent_bid}"}
8282
_, complete, pending, children, failure = Sidekiq.redis do |r|
8383
r.multi do |pipeline|
84-
pipeline.sadd("BID-#{parent_bid}-complete", bid)
84+
pipeline.sadd("BID-#{parent_bid}-complete", [bid])
8585
pipeline.scard("BID-#{parent_bid}-complete")
8686
pipeline.hincrby("BID-#{parent_bid}", "pending", 0)
8787
pipeline.hincrby("BID-#{parent_bid}", "children", 0)

lib/sidekiq/batch/version.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
module Sidekiq
22
class Batch
3-
VERSION = '0.1.8'.freeze
3+
VERSION = '0.1.9'.freeze
44
end
55
end

sidekiq-batch.gemspec

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ Gem::Specification.new do |spec|
1919
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
2020
spec.require_paths = ["lib"]
2121

22-
spec.add_dependency "sidekiq", ">= 3"
22+
spec.add_dependency "sidekiq", ">= 7", "<8"
2323

2424
spec.add_development_dependency "bundler", "~> 2.1"
2525
spec.add_development_dependency "rake", "~> 13.0"
2626
spec.add_development_dependency "rspec", "~> 3.0"
27-
spec.add_development_dependency "fakeredis", "~> 0.8.0"
2827
end

spec/sidekiq/batch/middleware_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
end
8080

8181
describe Sidekiq::Batch::Middleware do
82-
let(:config) { class_double(Sidekiq) }
82+
let(:config) { instance_double(Sidekiq::Config) }
8383
let(:client_middleware) { double(Sidekiq::Middleware::Chain) }
8484

8585
context 'client' do

spec/sidekiq/batch_spec.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def was_performed; end
170170
Sidekiq::Batch.process_failed_job(bid, 'failed-job-id')
171171
Sidekiq::Batch.process_failed_job(bid, failed_jid)
172172
failed = Sidekiq.redis { |r| r.smembers("BID-#{bid}-failed") }
173-
expect(failed).to eq(['xxx', 'failed-job-id'])
173+
expect(failed).to eq(['failed-job-id', 'xxx'])
174174
end
175175
end
176176
end
@@ -245,7 +245,7 @@ def was_performed; end
245245
it 'returns and does not enqueue callbacks' do
246246
batch = Sidekiq::Batch.new
247247
batch.on(event, SampleCallback)
248-
Sidekiq.redis { |r| r.hset("BID-#{batch.bid}", event, true) }
248+
Sidekiq.redis { |r| r.hset("BID-#{batch.bid}", event, 'true') }
249249

250250
expect(Sidekiq::Client).not_to receive(:push)
251251
Sidekiq::Batch.enqueue_callbacks(event, batch.bid)
@@ -290,8 +290,8 @@ def was_performed; end
290290
expect(Sidekiq::Client).to receive(:push_bulk).with(
291291
'class' => Sidekiq::Batch::Callback::Worker,
292292
'args' => [
293-
['SampleCallback2', event.to_s, opts2, batch.bid, nil],
294-
['SampleCallback', event.to_s, opts, batch.bid, nil]
293+
['SampleCallback', event.to_s, opts, batch.bid, nil],
294+
['SampleCallback2', event.to_s, opts2, batch.bid, nil]
295295
],
296296
'queue' => 'default'
297297
)

spec/spec_helper.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22
SimpleCov.start
33

44
$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__)
5-
require 'fakeredis/rspec'
65
require 'sidekiq/batch'
76

87
redis_opts = { url: "redis://127.0.0.1:6379/1" }
9-
redis_opts[:driver] = Redis::Connection::Memory if defined?(Redis::Connection::Memory)
108

119
Sidekiq.configure_client do |config|
1210
config.redis = redis_opts
@@ -19,6 +17,12 @@
1917
RSpec.configure do |config|
2018
config.filter_run focus: true
2119
config.run_all_when_everything_filtered = true
20+
21+
config.before do
22+
Sidekiq.redis do |redis|
23+
redis.flushdb
24+
end
25+
end
2226
end
2327

2428
Dir[File.dirname(__FILE__) + "/support/**/*.rb"].each {|f| require f }

0 commit comments

Comments
 (0)