Skip to content

Commit

Permalink
Introduce ConnectionFactory
Browse files Browse the repository at this point in the history
In the future we would like to use multiple threads. Every thread
will obtain requests from separate `ZMQ` connection as it is
recommended in the tutorial. Because of that we will have to
create the sockets for every thread. For that reason `Handler` takes
now in constructor a factory capable of creating new `Connection`
instead of the `Connection` itself.

`Connection#connection` method is introduced so that `Connection` has
interface compatible with `ConnectionFactory` and can be used
instead returning always `self` whenever `connection` is needed. That
should work fine for simplest testing case or in single-threaded
environment.
  • Loading branch information
paneq committed Jul 25, 2012
1 parent d5cf37b commit 592ff0b
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 44 deletions.
2 changes: 1 addition & 1 deletion example/http_0mq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ def process(request)
pull_port = "tcp://127.0.0.1:9999"
pub_port = "tcp://127.0.0.1:9998"

handler = Http0MQHandler.for(sender_id, pull_port, pub_port)
handler = Http0MQHandler.new(M2R::ConnectionFactory.new(sender_id, pull_port, pub_port))
handler.listen

1 change: 1 addition & 0 deletions lib/m2r.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ def zmq_context(threads = 1)
require 'm2r/request'
require 'm2r/response'
require 'm2r/connection'
require 'm2r/connection_factory'
require 'm2r/handler'
16 changes: 5 additions & 11 deletions lib/m2r/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,19 @@
module M2R
class Connection

def initialize(request_socket, response_socket)
def initialize(request_socket, response_socket, request_parser = Request)
@request_socket = request_socket
@response_socket = response_socket
@request_parser = request_parser
end

def self.for(sender_id, request_addr, response_addr, context = M2R.zmq_context)
request_socket = context.socket(ZMQ::PULL)
request_socket.connect(request_addr)

response_socket = context.socket(ZMQ::PUB)
response_socket.connect(response_addr)
response_socket.setsockopt(ZMQ::IDENTITY, sender_id)

new(request_socket, response_socket)
def connection
self
end

def receive
@request_socket.recv_string(msg = "")
Request.parse(msg)
@request_parser.parse(msg)
end

def reply(request, response_or_string)
Expand Down
26 changes: 26 additions & 0 deletions lib/m2r/connection_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
require 'm2r'

module M2R
class ConnectionFactory

def initialize(sender_id, request_addr, response_addr, request_parser = Request, context = M2R.zmq_context)
@sender_id = sender_id.to_s
@request_addr = request_addr.to_s
@response_addr = response_addr.to_s
@request_parser = request_parser
@context = context
end

def connection
request_socket = @context.socket(ZMQ::PULL)
request_socket.connect(@request_addr)

response_socket = @context.socket(ZMQ::PUB)
response_socket.connect(@response_addr)
response_socket.setsockopt(ZMQ::IDENTITY, @sender_id)

Connection.new(request_socket, response_socket, @request_parser)
end

end
end
8 changes: 2 additions & 6 deletions lib/m2r/handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ module M2R
class Handler
attr_accessor :connection

def initialize(connection)
@connection = connection
end

def self.for(sender_uuid, subscribe_address, publish_address)
new(Connection.for(sender_uuid, subscribe_address, publish_address))
def initialize(connection_factory)
@connection = connection_factory.connection
end

# Callback for when the handler is waiting for a request
Expand Down
8 changes: 2 additions & 6 deletions lib/m2r/rack_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ module M2R
class RackHandler < Handler
attr_accessor :app

def initialize(app, connection)
def initialize(app, connection_factory)
@app = app
super(connection)
super(connection_factory)

trap('INT') { stop }
end

def self.for(app, sender_uuid, subscribe_address, publish_address)
new(app, Connection.for(sender_uuid, subscribe_address, publish_address))
end

def process(request)
script_name = request.pattern.split('(', 2).first.gsub(/\/$/, '')

Expand Down
12 changes: 7 additions & 5 deletions lib/rack/handler/mongrel2.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
require 'rack/handler'
require 'm2r/rack_handler'
require 'securerandom'
require 'ostruct'

module Rack
module Handler
class Mongrel2
DEFAULT_OPTIONS = {
:recv_addr => 'tcp://127.0.0.1:9997',
:send_addr => 'tcp://127.0.0.1:9996',
:sender_id => SecureRandom.uuid
'recv_addr' => 'tcp://127.0.0.1:9997',
'send_addr' => 'tcp://127.0.0.1:9996',
'sender_id' => SecureRandom.uuid
}

def self.run(app, options = {})
options = DEFAULT_OPTIONS.merge(options)
adapter = M2R::RackHandler.for(app, options[:sender_id], options[:recv_addr], options[:send_addr])
options = OpenStruct.new( DEFAULT_OPTIONS.merge(options) )
factory = M2R::ConnectionFactory.new(options.sender_id, options.recv_addr, options.send_addr)
adapter = M2R::RackHandler.new(app, factory)
adapter.listen
end

Expand Down
29 changes: 29 additions & 0 deletions test/connection_factory_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require 'test_helper'
require 'securerandom'

module M2R
class ConnectionFactoryTest < MiniTest::Unit::TestCase
def test_factory
sender_id = "sid"
request_addr = "req"
response_addr = "req"
request_parser = Object.new

pull = stub(:pull)
pub = stub(:pub)
context = stub(:context)

context.expects(:socket).with(ZMQ::PULL).returns(pull)
context.expects(:socket).with(ZMQ::PUB).returns(pub)

pull.expects(:connect).with(request_addr)

pub.expects(:connect).with(response_addr)
pub.expects(:setsockopt).with(ZMQ::IDENTITY, sender_id)

Connection.expects(:new).with(pull, pub, request_parser)
cf = ConnectionFactory.new sender_id, request_addr, response_addr, request_parser, context
cf.connection
end
end
end
45 changes: 36 additions & 9 deletions test/connection_test.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,49 @@
require 'test_helper'
require 'securerandom'

module M2R
class ConnectionTest < MiniTest::Unit::TestCase
def test_receive_message
request_addr = "inproc://requests"
response_addr = "inproc://responses"

push = M2R.zmq_context.socket(ZMQ::PUSH)
assert_equal 0, push.bind(request_addr), "Could not bind push socket in tests"
def setup
@request_addr = "inproc://requests"
@response_addr = "inproc://responses"

@push = M2R.zmq_context.socket(ZMQ::PUSH)
assert_equal 0, @push.bind(@request_addr), "Could not bind push socket in tests"

@sub = M2R.zmq_context.socket(ZMQ::SUB)
assert_equal 0, @sub.bind(@response_addr), "Could not bind sub socket in tests"

sub = M2R.zmq_context.socket(ZMQ::SUB)
assert_equal 0, sub.bind(response_addr), "Could not bind sub socket in tests"

connection = Connection.for("a65c2d89-96ee-4bc9-971e-59b38ae24645", request_addr, response_addr)
@request_socket = M2R.zmq_context.socket(ZMQ::PULL)
@request_socket.connect(@request_addr)

@response_socket = M2R.zmq_context.socket(ZMQ::PUB)
@response_socket.connect(@response_addr)
@response_socket.setsockopt(ZMQ::IDENTITY, @sender_id = SecureRandom.uuid)
end

push.send_string("1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK)
def teardown
@request_socket.close
@response_socket.close
@push.close
@sub.close
end

def test_receive_message
connection = Connection.new(@request_socket, @response_socket)
@push.send_string("1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK)
assert_instance_of Request, connection.receive
end

def test_different_parser
msg = "1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,"
parser = stub(:parser)
parser.expects(:parse).with(msg).returns(request = Object.new)
connection = Connection.new(@request_socket, @response_socket, parser)
@push.send_string(msg = "1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK)
assert_equal request, connection.receive
end

end
end
3 changes: 3 additions & 0 deletions test/handler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ module M2R
class HandlerTest < MiniTest::Unit::TestCase
def test_lifecycle_for_disconnect
connection = stub(:receive => disconnect_request)
connection.stubs(:connection).returns(connection)
h = TestHandler.new(connection)
h.listen
assert_equal [:wait, :request, :disconnect], h.called_methods
end

def test_lifecycle_for_upload_start
connection = stub(:receive => upload_start_request)
connection.stubs(:connection).returns(connection)
h = TestHandler.new(connection)
h.listen
assert_equal [:wait, :request, :start], h.called_methods
end

def test_lifecycle_for_upload_done
connection = stub(:receive => upload_done_request, :reply => nil)
connection.stubs(:connection).returns(connection)
h = TestHandler.new(connection)
h.listen
assert_equal [:wait, :request, :done, :process, :after, :reply], h.called_methods
Expand Down
10 changes: 10 additions & 0 deletions test/headers_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,15 @@ def test_rackify
"CONTENT_LENGTH" => "123"
}, env)
end

def test_rackify_empty_headers
headers = Headers.new({})
env = {"rack.something" => "value"}
headers.rackify(env)
assert_equal({
"rack.something" => "value",
}, env)
end

end
end
13 changes: 7 additions & 6 deletions test/rack_handler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ def test_options
require 'rack/handler/mongrel2'
handler = ::Rack::Handler::Mongrel2
options = {
recv_addr: recv = 'tcp://1.2.3.4:1234',
send_addr: send = 'tcp://1.2.3.4:4321',
sender_id: id = SecureRandom.uuid
'recv_addr' => recv = 'tcp://1.2.3.4:1234',
'send_addr' => send = 'tcp://1.2.3.4:4321',
'sender_id' => id = SecureRandom.uuid
}
Connection.expects(:for).with(id, recv, send)
cf = mock(:connection)
ConnectionFactory.expects(:new).with(id, recv, send).returns(cf)
RackHandler.any_instance.stubs(:stop? => true)
handler.run(HelloWorld.new, options)
end

def test_lint_rack_adapter
connection = stub
handler = RackHandler.new(app, connection)
factory = stub(:connection)
handler = RackHandler.new(app, factory)
response = handler.process(root_request)

assert_equal "Hello world!", response.body
Expand Down

0 comments on commit 592ff0b

Please sign in to comment.