Skip to content

Commit 3cdd369

Browse files
committed
Initial implementation - missing status
0 parents  commit 3cdd369

20 files changed

+527
-0
lines changed

.gitignore

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/.bundle/
2+
/.yardoc
3+
/Gemfile.lock
4+
/_yardoc/
5+
/coverage/
6+
/doc/
7+
/pkg/
8+
/spec/reports/
9+
/tmp/
10+
dump.rdb

.rspec

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
--format documentation
2+
--color

.travis.yml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
sudo: false
2+
language: ruby
3+
rvm:
4+
- 2.3.1
5+
before_install: gem install bundler -v 1.12.5

Gemfile

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
source 'https://rubygems.org'
2+
3+
# Specify your gem's dependencies in sidekiq-batch.gemspec
4+
gemspec

Guardfile

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
guard 'rspec', cmd: 'rspec --color' do
2+
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" }
3+
watch(%r|^spec/(.*)_spec\.rb|)
4+
end

LICENSE.txt

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2016 Breamware
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in
13+
all copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
THE SOFTWARE.

README.md

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Sidekiq::Batch
2+
3+
Simple Sidekiq Batch Job implementation.
4+
5+
## Installation
6+
7+
Add this line to your application's Gemfile:
8+
9+
```ruby
10+
gem 'sidekiq-batch'
11+
```
12+
13+
And then execute:
14+
15+
$ bundle
16+
17+
Or install it yourself as:
18+
19+
$ gem install sidekiq-batch
20+
21+
## Usage
22+
23+
Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
24+
25+
## Contributing
26+
27+
Bug reports and pull requests are welcome on GitHub at https://github.com/breamware/sidekiq-batch.
28+
29+
30+
## License
31+
32+
The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).

Rakefile

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
require "bundler/gem_tasks"
2+
require "rspec/core/rake_task"
3+
4+
RSpec::Core::RakeTask.new(:spec)
5+
6+
task :default => :spec

lib/sidekiq/batch.rb

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
require 'securerandom'
2+
require 'sidekiq'
3+
4+
require 'sidekiq/batch/callback'
5+
require 'sidekiq/batch/middleware'
6+
require 'sidekiq/batch/status'
7+
require 'sidekiq/batch/version'
8+
9+
module Sidekiq
10+
class Batch
11+
class NoBlockGivenError < StandardError; end
12+
13+
attr_reader :bid, :description, :callback_queue
14+
15+
def initialize(existing_bid = nil)
16+
@bid = existing_bid || SecureRandom.urlsafe_base64(10)
17+
Sidekiq.redis { |r| r.set("#{bid}-to_process", 0) }
18+
end
19+
20+
def description=(description)
21+
@description = description
22+
Sidekiq.redis { |r| r.hset(bid, 'description', description) }
23+
end
24+
25+
def callback_queue=(callback_queue)
26+
@callback_queue = callback_queue
27+
Sidekiq.redis { |r| r.hset(bid, 'callback_queue', callback_queue) }
28+
end
29+
30+
def on(event, callback, options = {})
31+
return unless %w(success complete).include?(event.to_s)
32+
Sidekiq.redis do |r|
33+
r.hset(bid, "callback_#{event}", callback)
34+
r.hset(bid, "callback_#{event}_opts", options.to_json)
35+
end
36+
end
37+
38+
def jobs
39+
raise NoBlockGivenError unless block_given?
40+
41+
Batch.increment_job_queue(bid)
42+
Thread.current[:bid] = bid
43+
yield
44+
Batch.process_successful_job(bid)
45+
end
46+
47+
class << self
48+
def process_failed_job(bid, jid)
49+
to_process = Sidekiq.redis do |r|
50+
r.multi do
51+
r.sadd("#{bid}-failed", jid)
52+
r.scard("#{bid}-failed")
53+
r.get("#{bid}-to_process")
54+
end
55+
end
56+
if to_process[2].to_i == to_process[1].to_i
57+
Callback.call_if_needed(:complete, bid)
58+
end
59+
end
60+
61+
def process_successful_job(bid)
62+
to_process = Sidekiq.redis do |r|
63+
r.multi do
64+
r.decr("#{bid}-to_process")
65+
r.get("#{bid}-to_process")
66+
end
67+
end
68+
if to_process[1].to_i == 0
69+
Callback.call_if_needed(:success, bid)
70+
Callback.call_if_needed(:complete, bid)
71+
end
72+
end
73+
74+
def increment_job_queue(bid)
75+
Sidekiq.redis { |r| r.incr("#{bid}-to_process") }
76+
end
77+
end
78+
end
79+
end

lib/sidekiq/batch/callback.rb

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
module Sidekiq
2+
class Batch
3+
module Callback
4+
class Worker
5+
include Sidekiq::Worker
6+
7+
def perform(clazz, event, opts, bid)
8+
return unless %w(success complete).include?(event)
9+
instance = clazz.constantize.send(:new) rescue nil
10+
return unless instance
11+
instance.send("on_#{event}", Status.new(bid), opts) rescue nil
12+
end
13+
end
14+
15+
class << self
16+
def call_if_needed(event, bid)
17+
needed = Sidekiq.redis do |r|
18+
r.multi do
19+
r.hget(bid, event)
20+
r.hset(bid, event, true)
21+
end
22+
end
23+
return if 'true' == needed[0]
24+
callback, opts, queue = Sidekiq.redis do |r|
25+
r.hmget(bid,
26+
"callback_#{event}", "callback_#{event}_opts",
27+
'callback_queue')
28+
end
29+
return unless callback
30+
opts = JSON.parse(opts) if opts
31+
opts ||= {}
32+
queue ||= 'default'
33+
Sidekiq::Client.push('class' => Sidekiq::Batch::Callback::Worker,
34+
'args' => [callback, event, opts, bid],
35+
'queue' => queue)
36+
end
37+
end
38+
end
39+
end
40+
end

lib/sidekiq/batch/middleware.rb

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
module Sidekiq
2+
class Batch
3+
module Middleware
4+
def self.extended(base)
5+
base.class_eval do
6+
register_middleware
7+
end
8+
end
9+
10+
def register_middleware
11+
Sidekiq.configure_server do |config|
12+
config.client_middleware do |chain|
13+
chain.add ClientMiddleware
14+
end
15+
config.server_middleware do |chain|
16+
chain.add ClientMiddleware
17+
chain.add ServerMiddleware
18+
end
19+
end
20+
end
21+
22+
class ClientMiddleware
23+
def call(_worker, msg, _queue, _redis_pool = nil)
24+
if (bid = Thread.current[:bid])
25+
Batch.increment_job_queue(bid) if
26+
msg[:bid] = bid
27+
end
28+
yield
29+
end
30+
end
31+
32+
class ServerMiddleware
33+
def call(_worker, msg, _queue)
34+
if (bid = msg['bid'])
35+
begin
36+
yield
37+
Batch.process_successful_job(bid)
38+
rescue
39+
Batch.process_failed_job(bid, msg['jid'])
40+
raise
41+
end
42+
else
43+
yield
44+
end
45+
end
46+
end
47+
end
48+
end
49+
end
50+
51+
Sidekiq::Batch::Middleware.send(:extend, Sidekiq::Batch::Middleware)

lib/sidekiq/batch/status.rb

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
module Sidekiq
2+
class Batch
3+
class Status
4+
attr_reader :bid, :total, :failures, :created_at, :failure_info
5+
6+
def initialize(bid)
7+
@bid = bid
8+
end
9+
10+
def join
11+
raise "Not supported"
12+
end
13+
14+
def pending
15+
Sidekiq.redis { |r| r.get("#{bid}-to_process") }.to_i
16+
end
17+
18+
def complete?
19+
'true' == Sidekiq.redis { |r| r.hget(bid, 'complete') }
20+
end
21+
22+
def data
23+
{
24+
total: total,
25+
failures: failures,
26+
pending: pending,
27+
created_at: created_at,
28+
complete: complete?,
29+
failure_info: failure_info
30+
}
31+
end
32+
end
33+
end
34+
end

lib/sidekiq/batch/version.rb

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module Sidekiq
2+
class Batch
3+
VERSION = '0.1.0.pre'.freeze
4+
end
5+
end

sidekiq-batch.gemspec

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# coding: utf-8
2+
lib = File.expand_path('../lib', __FILE__)
3+
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
4+
require 'sidekiq/batch/version'
5+
6+
Gem::Specification.new do |spec|
7+
spec.name = "sidekiq-batch"
8+
spec.version = Sidekiq::Batch::VERSION
9+
spec.authors = ["Marcin Naglik"]
10+
spec.email = ["[email protected]"]
11+
12+
spec.summary = "Sidekiq Batch Jobs"
13+
spec.description = "Sidekiq Batch Jobs Implementation"
14+
spec.homepage = "http://github.com/breamware/sidekiq-batch"
15+
spec.license = "MIT"
16+
17+
spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
18+
spec.bindir = "exe"
19+
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
20+
spec.require_paths = ["lib"]
21+
22+
spec.add_dependency "sidekiq", "~> 4"
23+
24+
spec.add_development_dependency "bundler", "~> 1.12"
25+
spec.add_development_dependency "rake", "~> 10.0"
26+
spec.add_development_dependency "rspec", "~> 3.0"
27+
end

spec/integration/integration.rb

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
require 'sidekiq/batch'
2+
3+
class TestWorker
4+
include Sidekiq::Worker
5+
6+
def perform
7+
end
8+
end
9+
10+
class MyCallback
11+
def on_success(status, options)
12+
puts "Success #{options} #{status.data}"
13+
end
14+
15+
def on_complete(status, options)
16+
puts "Complete #{options} #{status.data}"
17+
end
18+
end
19+
20+
batch = Sidekiq::Batch.new
21+
batch.description = 'Test batch'
22+
batch.callback_queue = :default
23+
batch.on(:success, MyCallback, to: '[email protected]')
24+
batch.on(:complete, MyCallback, to: '[email protected]')
25+
26+
batch.jobs do
27+
10.times do
28+
TestWorker.perform_async
29+
end
30+
end
31+
puts Sidekiq::Batch::Status.new(batch.bid).data

0 commit comments

Comments
 (0)