diff --git a/example/http_0mq.rb b/example/http_0mq.rb index c10ceb8..72b462c 100644 --- a/example/http_0mq.rb +++ b/example/http_0mq.rb @@ -39,6 +39,6 @@ def process(request) pull_port = "tcp://127.0.0.1:9999" pub_port = "tcp://127.0.0.1:9998" -handler = Http0MQHandler.for(sender_id, pull_port, pub_port) +handler = Http0MQHandler.new(M2R::ConnectionFactory.new(sender_id, pull_port, pub_port)) handler.listen diff --git a/lib/m2r.rb b/lib/m2r.rb index 4bca27c..a5148bc 100644 --- a/lib/m2r.rb +++ b/lib/m2r.rb @@ -18,4 +18,5 @@ def zmq_context(threads = 1) require 'm2r/request' require 'm2r/response' require 'm2r/connection' +require 'm2r/connection_factory' require 'm2r/handler' diff --git a/lib/m2r/connection.rb b/lib/m2r/connection.rb index a42d0d0..f77c7ce 100644 --- a/lib/m2r/connection.rb +++ b/lib/m2r/connection.rb @@ -3,25 +3,19 @@ module M2R class Connection - def initialize(request_socket, response_socket) + def initialize(request_socket, response_socket, request_parser = Request) @request_socket = request_socket @response_socket = response_socket + @request_parser = request_parser end - def self.for(sender_id, request_addr, response_addr, context = M2R.zmq_context) - request_socket = context.socket(ZMQ::PULL) - request_socket.connect(request_addr) - - response_socket = context.socket(ZMQ::PUB) - response_socket.connect(response_addr) - response_socket.setsockopt(ZMQ::IDENTITY, sender_id) - - new(request_socket, response_socket) + def connection + self end def receive @request_socket.recv_string(msg = "") - Request.parse(msg) + @request_parser.parse(msg) end def reply(request, response_or_string) diff --git a/lib/m2r/connection_factory.rb b/lib/m2r/connection_factory.rb new file mode 100644 index 0000000..41f33e2 --- /dev/null +++ b/lib/m2r/connection_factory.rb @@ -0,0 +1,26 @@ +require 'm2r' + +module M2R + class ConnectionFactory + + def initialize(sender_id, request_addr, response_addr, request_parser = Request, context = M2R.zmq_context) + @sender_id = sender_id.to_s + @request_addr = request_addr.to_s + @response_addr = response_addr.to_s + @request_parser = request_parser + @context = context + end + + def connection + request_socket = @context.socket(ZMQ::PULL) + request_socket.connect(@request_addr) + + response_socket = @context.socket(ZMQ::PUB) + response_socket.connect(@response_addr) + response_socket.setsockopt(ZMQ::IDENTITY, @sender_id) + + Connection.new(request_socket, response_socket, @request_parser) + end + + end +end diff --git a/lib/m2r/handler.rb b/lib/m2r/handler.rb index 5c40553..a5a407f 100644 --- a/lib/m2r/handler.rb +++ b/lib/m2r/handler.rb @@ -4,12 +4,8 @@ module M2R class Handler attr_accessor :connection - def initialize(connection) - @connection = connection - end - - def self.for(sender_uuid, subscribe_address, publish_address) - new(Connection.for(sender_uuid, subscribe_address, publish_address)) + def initialize(connection_factory) + @connection = connection_factory.connection end # Callback for when the handler is waiting for a request diff --git a/lib/m2r/rack_handler.rb b/lib/m2r/rack_handler.rb index 00c3da3..09e8178 100644 --- a/lib/m2r/rack_handler.rb +++ b/lib/m2r/rack_handler.rb @@ -5,17 +5,13 @@ module M2R class RackHandler < Handler attr_accessor :app - def initialize(app, connection) + def initialize(app, connection_factory) @app = app - super(connection) + super(connection_factory) trap('INT') { stop } end - def self.for(app, sender_uuid, subscribe_address, publish_address) - new(app, Connection.for(sender_uuid, subscribe_address, publish_address)) - end - def process(request) script_name = request.pattern.split('(', 2).first.gsub(/\/$/, '') diff --git a/lib/rack/handler/mongrel2.rb b/lib/rack/handler/mongrel2.rb index bc7ca67..2994439 100644 --- a/lib/rack/handler/mongrel2.rb +++ b/lib/rack/handler/mongrel2.rb @@ -1,19 +1,21 @@ require 'rack/handler' require 'm2r/rack_handler' require 'securerandom' +require 'ostruct' module Rack module Handler class Mongrel2 DEFAULT_OPTIONS = { - :recv_addr => 'tcp://127.0.0.1:9997', - :send_addr => 'tcp://127.0.0.1:9996', - :sender_id => SecureRandom.uuid + 'recv_addr' => 'tcp://127.0.0.1:9997', + 'send_addr' => 'tcp://127.0.0.1:9996', + 'sender_id' => SecureRandom.uuid } def self.run(app, options = {}) - options = DEFAULT_OPTIONS.merge(options) - adapter = M2R::RackHandler.for(app, options[:sender_id], options[:recv_addr], options[:send_addr]) + options = OpenStruct.new( DEFAULT_OPTIONS.merge(options) ) + factory = M2R::ConnectionFactory.new(options.sender_id, options.recv_addr, options.send_addr) + adapter = M2R::RackHandler.new(app, factory) adapter.listen end diff --git a/test/connection_factory_test.rb b/test/connection_factory_test.rb new file mode 100644 index 0000000..31d6934 --- /dev/null +++ b/test/connection_factory_test.rb @@ -0,0 +1,29 @@ +require 'test_helper' +require 'securerandom' + +module M2R + class ConnectionFactoryTest < MiniTest::Unit::TestCase + def test_factory + sender_id = "sid" + request_addr = "req" + response_addr = "req" + request_parser = Object.new + + pull = stub(:pull) + pub = stub(:pub) + context = stub(:context) + + context.expects(:socket).with(ZMQ::PULL).returns(pull) + context.expects(:socket).with(ZMQ::PUB).returns(pub) + + pull.expects(:connect).with(request_addr) + + pub.expects(:connect).with(response_addr) + pub.expects(:setsockopt).with(ZMQ::IDENTITY, sender_id) + + Connection.expects(:new).with(pull, pub, request_parser) + cf = ConnectionFactory.new sender_id, request_addr, response_addr, request_parser, context + cf.connection + end + end +end diff --git a/test/connection_test.rb b/test/connection_test.rb index 31d484b..40774d8 100644 --- a/test/connection_test.rb +++ b/test/connection_test.rb @@ -1,22 +1,49 @@ require 'test_helper' +require 'securerandom' module M2R class ConnectionTest < MiniTest::Unit::TestCase - def test_receive_message - request_addr = "inproc://requests" - response_addr = "inproc://responses" - push = M2R.zmq_context.socket(ZMQ::PUSH) - assert_equal 0, push.bind(request_addr), "Could not bind push socket in tests" + def setup + @request_addr = "inproc://requests" + @response_addr = "inproc://responses" + + @push = M2R.zmq_context.socket(ZMQ::PUSH) + assert_equal 0, @push.bind(@request_addr), "Could not bind push socket in tests" + + @sub = M2R.zmq_context.socket(ZMQ::SUB) + assert_equal 0, @sub.bind(@response_addr), "Could not bind sub socket in tests" - sub = M2R.zmq_context.socket(ZMQ::SUB) - assert_equal 0, sub.bind(response_addr), "Could not bind sub socket in tests" - connection = Connection.for("a65c2d89-96ee-4bc9-971e-59b38ae24645", request_addr, response_addr) + @request_socket = M2R.zmq_context.socket(ZMQ::PULL) + @request_socket.connect(@request_addr) + + @response_socket = M2R.zmq_context.socket(ZMQ::PUB) + @response_socket.connect(@response_addr) + @response_socket.setsockopt(ZMQ::IDENTITY, @sender_id = SecureRandom.uuid) + end - push.send_string("1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK) + def teardown + @request_socket.close + @response_socket.close + @push.close + @sub.close + end + def test_receive_message + connection = Connection.new(@request_socket, @response_socket) + @push.send_string("1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK) assert_instance_of Request, connection.receive end + + def test_different_parser + msg = "1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:," + parser = stub(:parser) + parser.expects(:parse).with(msg).returns(request = Object.new) + connection = Connection.new(@request_socket, @response_socket, parser) + @push.send_string(msg = "1c5fd481-1121-49d8-a706-69127975db1a ebb407b2-49aa-48a5-9f96-9db121051484 / 2:{},0:,", ZMQ::NOBLOCK) + assert_equal request, connection.receive + end + end end diff --git a/test/handler_test.rb b/test/handler_test.rb index 20e2e43..33faad9 100644 --- a/test/handler_test.rb +++ b/test/handler_test.rb @@ -4,6 +4,7 @@ module M2R class HandlerTest < MiniTest::Unit::TestCase def test_lifecycle_for_disconnect connection = stub(:receive => disconnect_request) + connection.stubs(:connection).returns(connection) h = TestHandler.new(connection) h.listen assert_equal [:wait, :request, :disconnect], h.called_methods @@ -11,6 +12,7 @@ def test_lifecycle_for_disconnect def test_lifecycle_for_upload_start connection = stub(:receive => upload_start_request) + connection.stubs(:connection).returns(connection) h = TestHandler.new(connection) h.listen assert_equal [:wait, :request, :start], h.called_methods @@ -18,6 +20,7 @@ def test_lifecycle_for_upload_start def test_lifecycle_for_upload_done connection = stub(:receive => upload_done_request, :reply => nil) + connection.stubs(:connection).returns(connection) h = TestHandler.new(connection) h.listen assert_equal [:wait, :request, :done, :process, :after, :reply], h.called_methods diff --git a/test/headers_test.rb b/test/headers_test.rb index ea56224..a86f9b5 100644 --- a/test/headers_test.rb +++ b/test/headers_test.rb @@ -31,5 +31,15 @@ def test_rackify "CONTENT_LENGTH" => "123" }, env) end + + def test_rackify_empty_headers + headers = Headers.new({}) + env = {"rack.something" => "value"} + headers.rackify(env) + assert_equal({ + "rack.something" => "value", + }, env) + end + end end diff --git a/test/rack_handler_test.rb b/test/rack_handler_test.rb index ca0f08d..0a9dc5c 100644 --- a/test/rack_handler_test.rb +++ b/test/rack_handler_test.rb @@ -21,18 +21,19 @@ def test_options require 'rack/handler/mongrel2' handler = ::Rack::Handler::Mongrel2 options = { - recv_addr: recv = 'tcp://1.2.3.4:1234', - send_addr: send = 'tcp://1.2.3.4:4321', - sender_id: id = SecureRandom.uuid + 'recv_addr' => recv = 'tcp://1.2.3.4:1234', + 'send_addr' => send = 'tcp://1.2.3.4:4321', + 'sender_id' => id = SecureRandom.uuid } - Connection.expects(:for).with(id, recv, send) + cf = mock(:connection) + ConnectionFactory.expects(:new).with(id, recv, send).returns(cf) RackHandler.any_instance.stubs(:stop? => true) handler.run(HelloWorld.new, options) end def test_lint_rack_adapter - connection = stub - handler = RackHandler.new(app, connection) + factory = stub(:connection) + handler = RackHandler.new(app, factory) response = handler.process(root_request) assert_equal "Hello world!", response.body