@@ -16,15 +16,11 @@ class NoBlockGivenError < StandardError; end
16
16
17
17
def initialize ( existing_bid = nil )
18
18
@bid = existing_bid || SecureRandom . urlsafe_base64 ( 10 )
19
- @existing = !( !existing_bid || existing_bid . empty? ) # Basically existing_bid.present?
19
+ @existing = !( !existing_bid || existing_bid . empty? ) # Basically existing_bid.present?
20
20
@initialized = false
21
21
@created_at = Time . now . utc . to_f
22
- @bidkey = "BID-" + @bid . to_s
23
- @queued_jids = [ ]
24
- @pending_jids = [ ]
25
- sidekiq_config = Sidekiq . const_defined? ( 'MAJOR' ) && Sidekiq ::MAJOR >= 7 ? ( Thread . current [ :sidekiq_capsule ] &.config || Sidekiq . default_configuration ) : Sidekiq . options
26
- @incremental_push = sidekiq_config [ :batch_push_interval ] &.present?
27
- @batch_push_interval = sidekiq_config [ :batch_push_interval ]
22
+ @bidkey = 'BID-' + @bid . to_s
23
+ @ready_to_queue = [ ]
28
24
end
29
25
30
26
def description = ( description )
@@ -43,14 +39,13 @@ def callback_batch=(callback_batch)
43
39
end
44
40
45
41
def on ( event , callback , options = { } )
46
- return unless %w( success complete ) . include? ( event . to_s )
42
+ return unless %w[ success complete ] . include? ( event . to_s )
43
+
47
44
callback_key = "#{ @bidkey } -callbacks-#{ event } "
48
45
Sidekiq . redis do |r |
49
46
r . multi do |pipeline |
50
- pipeline . sadd ( callback_key , [ JSON . unparse ( {
51
- callback : callback ,
52
- opts : options
53
- } ) ] )
47
+ json_params = JSON . unparse ( { callback :, opts : options } )
48
+ pipeline . sadd ( callback_key , json_params )
54
49
pipeline . expire ( callback_key , BID_EXPIRE_TTL )
55
50
end
56
51
end
@@ -59,95 +54,64 @@ def on(event, callback, options = {})
59
54
def jobs
60
55
raise NoBlockGivenError unless block_given?
61
56
62
- bid_data , Thread . current [ :bid_data ] = Thread . current [ :bid_data ] , [ ]
57
+ bid_data = Thread . current [ :bid_data ]
58
+ Thread . current [ :bid_data ] = [ ]
63
59
64
60
begin
65
61
if !@existing && !@initialized
66
62
parent_bid = Thread . current [ :batch ] . bid if Thread . current [ :batch ]
67
63
68
64
Sidekiq . redis do |r |
69
65
r . multi do |pipeline |
70
- pipeline . hset ( @bidkey , "created_at" , @created_at )
66
+ pipeline . hset ( @bidkey , 'created_at' , @created_at )
67
+ pipeline . hset ( @bidkey , 'parent_bid' , parent_bid . to_s ) if parent_bid
71
68
pipeline . expire ( @bidkey , BID_EXPIRE_TTL )
72
- if parent_bid
73
- pipeline . hset ( @bidkey , "parent_bid" , parent_bid . to_s )
74
- pipeline . hincrby ( "BID-#{ parent_bid } " , "children" , 1 )
75
- end
76
69
end
77
70
end
78
71
79
72
@initialized = true
80
73
end
81
74
82
- @queued_jids = [ ]
83
- @pending_jids = [ ]
75
+ @ready_to_queue = [ ]
84
76
85
77
begin
86
78
parent = Thread . current [ :batch ]
87
79
Thread . current [ :batch ] = self
88
- Thread . current [ :parent_bid ] = parent_bid
89
80
yield
90
81
ensure
91
82
Thread . current [ :batch ] = parent
92
- Thread . current [ :parent_bid ] = nil
93
83
end
94
84
95
- return [ ] if @queued_jids . size == 0
96
- conditional_redis_increment! ( true )
85
+ return [ ] if @ready_to_queue . size == 0
97
86
98
87
Sidekiq . redis do |r |
99
88
r . multi do |pipeline |
100
89
if parent_bid
90
+ pipeline . hincrby ( "BID-#{ parent_bid } " , 'children' , 1 )
91
+ pipeline . hincrby ( "BID-#{ parent_bid } " , 'total' , @ready_to_queue . size )
101
92
pipeline . expire ( "BID-#{ parent_bid } " , BID_EXPIRE_TTL )
102
93
end
103
94
95
+ pipeline . hincrby ( @bidkey , 'pending' , @ready_to_queue . size )
96
+ pipeline . hincrby ( @bidkey , 'total' , @ready_to_queue . size )
104
97
pipeline . expire ( @bidkey , BID_EXPIRE_TTL )
105
98
106
- pipeline . sadd ( @bidkey + "-jids" , [ @queued_jids . flatten ] )
107
- pipeline . expire ( @bidkey + "-jids" , BID_EXPIRE_TTL )
99
+ # binding.break
100
+ @ready_to_queue . each do |ready_to_queue |
101
+ pipeline . sadd ( @bidkey + '-jids' , ready_to_queue )
102
+ end
103
+ pipeline . expire ( @bidkey + '-jids' , BID_EXPIRE_TTL )
108
104
end
109
105
end
110
106
111
- @queued_jids
107
+ @ready_to_queue
112
108
ensure
113
109
Thread . current [ :bid_data ] = bid_data
114
110
end
115
111
end
116
112
117
113
def increment_job_queue ( jid )
118
- @queued_jids << jid
119
- @pending_jids << jid
120
- conditional_redis_increment!
121
- end
122
-
123
- def conditional_redis_increment! ( force = false )
124
- if should_increment? || force
125
- parent_bid = Thread . current [ :parent_bid ]
126
- Sidekiq . redis do |r |
127
- r . multi do |pipeline |
128
- if parent_bid
129
- pipeline . hincrby ( "BID-#{ parent_bid } " , "total" , @pending_jids . length )
130
- pipeline . expire ( "BID-#{ parent_bid } " , BID_EXPIRE_TTL )
131
- end
132
-
133
- pipeline . hincrby ( @bidkey , "pending" , @pending_jids . length )
134
- pipeline . hincrby ( @bidkey , "total" , @pending_jids . length )
135
- pipeline . expire ( @bidkey , BID_EXPIRE_TTL )
136
- end
137
- end
138
- @pending_jids = [ ]
139
- end
140
- end
141
-
142
- def should_increment?
143
- return false unless @incremental_push
144
- return true if @batch_push_interval == 0 || @queued_jids . length == 1
145
- now = Time . now . to_f
146
- @last_increment ||= now
147
- if @last_increment + @batch_push_interval > now
148
- @last_increment = now
149
- return true
150
- end
114
+ @ready_to_queue << jid
151
115
end
152
116
153
117
def invalidate_all
@@ -158,14 +122,14 @@ def invalidate_all
158
122
159
123
def parent_bid
160
124
Sidekiq . redis do |r |
161
- r . hget ( @bidkey , " parent_bid" )
125
+ r . hget ( @bidkey , ' parent_bid' )
162
126
end
163
127
end
164
128
165
129
def parent
166
- if parent_bid
167
- Sidekiq :: Batch . new ( parent_bid )
168
- end
130
+ return unless parent_bid
131
+
132
+ Sidekiq :: Batch . new ( parent_bid )
169
133
end
170
134
171
135
def valid? ( batch = self )
@@ -178,6 +142,7 @@ def valid?(batch = self)
178
142
def persist_bid_attr ( attribute , value )
179
143
Sidekiq . redis do |r |
180
144
r . multi do |pipeline |
145
+ value = value . to_s if value . in? [ true , false ]
181
146
pipeline . hset ( @bidkey , attribute , value )
182
147
pipeline . expire ( @bidkey , BID_EXPIRE_TTL )
183
148
end
@@ -188,13 +153,14 @@ class << self
188
153
def process_failed_job ( bid , jid )
189
154
_ , pending , failed , children , complete , parent_bid = Sidekiq . redis do |r |
190
155
r . multi do |pipeline |
191
- pipeline . sadd ( "BID-#{ bid } -failed" , [ jid ] )
156
+ jids = [ jid ]
157
+ jids . each { |jid | pipeline . sadd ( "BID-#{ bid } -failed" , jid ) }
192
158
193
- pipeline . hincrby ( "BID-#{ bid } " , " pending" , 0 )
159
+ pipeline . hincrby ( "BID-#{ bid } " , ' pending' , 0 )
194
160
pipeline . scard ( "BID-#{ bid } -failed" )
195
- pipeline . hincrby ( "BID-#{ bid } " , " children" , 0 )
161
+ pipeline . hincrby ( "BID-#{ bid } " , ' children' , 0 )
196
162
pipeline . scard ( "BID-#{ bid } -complete" )
197
- pipeline . hget ( "BID-#{ bid } " , " parent_bid" )
163
+ pipeline . hget ( "BID-#{ bid } " , ' parent_bid' )
198
164
199
165
pipeline . expire ( "BID-#{ bid } -failed" , BID_EXPIRE_TTL )
200
166
end
@@ -204,41 +170,44 @@ def process_failed_job(bid, jid)
204
170
if parent_bid
205
171
Sidekiq . redis do |r |
206
172
r . multi do |pipeline |
207
- pipeline . hincrby ( "BID-#{ parent_bid } " , "pending" , 1 )
208
- pipeline . sadd ( "BID-#{ parent_bid } -failed" , [ jid ] )
173
+ pipeline . hincrby ( "BID-#{ parent_bid } " , 'pending' , 1 )
174
+ jids = [ jid ]
175
+ jids . each { |jid | pipeline . sadd ( "BID-#{ parent_bid } -failed" , jid ) }
209
176
pipeline . expire ( "BID-#{ parent_bid } -failed" , BID_EXPIRE_TTL )
210
177
end
211
178
end
212
179
end
213
180
214
- if pending . to_i == failed . to_i && children == complete
215
- enqueue_callbacks ( :complete , bid )
216
- end
181
+ return unless pending . to_i == failed . to_i && children == complete
182
+
183
+ enqueue_callbacks ( :complete , bid )
217
184
end
218
185
219
186
def process_successful_job ( bid , jid )
220
187
failed , pending , children , complete , success , total , parent_bid = Sidekiq . redis do |r |
221
188
r . multi do |pipeline |
222
189
pipeline . scard ( "BID-#{ bid } -failed" )
223
- pipeline . hincrby ( "BID-#{ bid } " , " pending" , -1 )
224
- pipeline . hincrby ( "BID-#{ bid } " , " children" , 0 )
190
+ pipeline . hincrby ( "BID-#{ bid } " , ' pending' , -1 )
191
+ pipeline . hincrby ( "BID-#{ bid } " , ' children' , 0 )
225
192
pipeline . scard ( "BID-#{ bid } -complete" )
226
193
pipeline . scard ( "BID-#{ bid } -success" )
227
- pipeline . hget ( "BID-#{ bid } " , "total" )
228
- pipeline . hget ( "BID-#{ bid } " , "parent_bid" )
194
+ pipeline . hget ( "BID-#{ bid } " , 'total' )
195
+ pipeline . hget ( "BID-#{ bid } " , 'parent_bid' )
196
+
197
+ jids = [ jid ]
198
+ jids . each { |jid | pipeline . srem ( "BID-#{ bid } -failed" , jid ) }
199
+ jids . each { |jid | pipeline . srem ( "BID-#{ bid } -jids" , jid ) }
229
200
230
- pipeline . srem ( "BID-#{ bid } -failed" , [ jid ] )
231
- pipeline . srem ( "BID-#{ bid } -jids" , [ jid ] )
232
201
pipeline . expire ( "BID-#{ bid } " , BID_EXPIRE_TTL )
233
202
end
234
203
end
235
204
236
205
all_success = pending . to_i . zero? && children == success
237
206
# if complete or successfull call complete callback (the complete callback may then call successful)
238
- if ( pending . to_i == failed . to_i && children == complete ) || all_success
239
- enqueue_callbacks ( :complete , bid )
240
- enqueue_callbacks ( :success , bid ) if all_success
241
- end
207
+ return unless ( pending . to_i == failed . to_i && children == complete ) || all_success
208
+
209
+ enqueue_callbacks ( :complete , bid )
210
+ enqueue_callbacks ( :success , bid ) if all_success
242
211
end
243
212
244
213
def enqueue_callbacks ( event , bid )
@@ -250,30 +219,30 @@ def enqueue_callbacks(event, bid)
250
219
pipeline . hget ( batch_key , event_name )
251
220
pipeline . hset ( batch_key , event_name , 'true' )
252
221
pipeline . smembers ( callback_key )
253
- pipeline . hget ( batch_key , " callback_queue" )
254
- pipeline . hget ( batch_key , " parent_bid" )
255
- pipeline . hget ( batch_key , " callback_batch" )
222
+ pipeline . hget ( batch_key , ' callback_queue' )
223
+ pipeline . hget ( batch_key , ' parent_bid' )
224
+ pipeline . hget ( batch_key , ' callback_batch' )
256
225
end
257
226
end
258
227
259
228
return if already_processed == 'true'
260
229
261
- queue ||= " default"
262
- parent_bid = !parent_bid || parent_bid . empty? ? nil : parent_bid # Basically parent_bid.blank?
230
+ queue ||= ' default'
231
+ parent_bid = nil if !parent_bid || parent_bid . empty? # Basically parent_bid.blank?
263
232
callback_args = callbacks . reduce ( [ ] ) do |memo , jcb |
264
233
cb = Sidekiq . load_json ( jcb )
265
234
memo << [ cb [ 'callback' ] , event_name , cb [ 'opts' ] , bid , parent_bid ]
266
235
end
267
236
268
- opts = { " bid" => bid , " event" => event_name }
237
+ opts = { ' bid' => bid , ' event' => event_name }
269
238
270
239
# Run callback batch finalize synchronously
271
240
if callback_batch
272
241
# Extract opts from cb_args or use current
273
242
# Pass in stored event as callback finalize is processed on complete event
274
243
cb_opts = callback_args . first &.at ( 2 ) || opts
275
244
276
- Sidekiq . logger . debug { "Run callback batch bid: #{ bid } event: #{ event_name } args: #{ callback_args . inspect } " }
245
+ Sidekiq . logger . debug { "Run callback batch bid: #{ bid } event: #{ event_name } args: #{ callback_args . inspect } " }
277
246
# Finalize now
278
247
finalizer = Sidekiq ::Batch ::Callback ::Finalize . new
279
248
status = Status . new bid
@@ -282,7 +251,7 @@ def enqueue_callbacks(event, bid)
282
251
return
283
252
end
284
253
285
- Sidekiq . logger . debug { "Enqueue callback bid: #{ bid } event: #{ event_name } args: #{ callback_args . inspect } " }
254
+ Sidekiq . logger . debug { "Enqueue callback bid: #{ bid } event: #{ event_name } args: #{ callback_args . inspect } " }
286
255
287
256
if callback_args . empty?
288
257
# Finalize now
@@ -291,40 +260,41 @@ def enqueue_callbacks(event, bid)
291
260
finalizer . dispatch ( status , opts )
292
261
else
293
262
# Otherwise finalize in sub batch complete callback
294
- cb_batch = self . new
263
+ cb_batch = new
295
264
cb_batch . callback_batch = 'true'
296
- Sidekiq . logger . debug { "Adding callback batch: #{ cb_batch . bid } for batch: #{ bid } " }
297
- cb_batch . on ( :complete , " Sidekiq::Batch::Callback::Finalize#dispatch" , opts )
265
+ Sidekiq . logger . debug { "Adding callback batch: #{ cb_batch . bid } for batch: #{ bid } " }
266
+ cb_batch . on ( :complete , ' Sidekiq::Batch::Callback::Finalize#dispatch' , opts )
298
267
cb_batch . jobs do
299
268
push_callbacks callback_args , queue
300
269
end
301
270
end
302
271
end
303
272
304
273
def cleanup_redis ( bid )
305
- Sidekiq . logger . debug { "Cleaning redis of batch #{ bid } " }
274
+ Sidekiq . logger . debug { "Cleaning redis of batch #{ bid } " }
306
275
Sidekiq . redis do |r |
307
276
r . del (
308
277
"BID-#{ bid } " ,
309
278
"BID-#{ bid } -callbacks-complete" ,
310
279
"BID-#{ bid } -callbacks-success" ,
311
280
"BID-#{ bid } -failed" ,
312
-
313
281
"BID-#{ bid } -success" ,
314
282
"BID-#{ bid } -complete" ,
315
- "BID-#{ bid } -jids" ,
283
+ "BID-#{ bid } -jids"
316
284
)
317
285
end
318
286
end
319
287
320
- private
288
+ private
289
+
290
+ def push_callbacks ( args , queue )
291
+ return if args . empty?
321
292
322
- def push_callbacks args , queue
323
293
Sidekiq ::Client . push_bulk (
324
294
'class' => Sidekiq ::Batch ::Callback ::Worker ,
325
295
'args' => args ,
326
296
'queue' => queue
327
- ) unless args . empty?
297
+ )
328
298
end
329
299
end
330
300
end
0 commit comments