forked from ipython/ipyparallel
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcommunicator.py
57 lines (45 loc) · 1.9 KB
/
communicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python
"""A simple Communicator class that has N,E,S,W neighbors connected via 0MQ PEER sockets"""
import socket
import zmq
from ipyparallel.util import disambiguate_url
class EngineCommunicator:
"""An object that connects Engines to each other.
north and east sockets listen, while south and west sockets connect.
This class is useful in cases where there is a set of nodes that
must communicate only with their nearest neighbors.
"""
def __init__(self, interface='tcp://*', identity=None):
self._ctx = zmq.Context()
self.north = self._ctx.socket(zmq.PAIR)
self.west = self._ctx.socket(zmq.PAIR)
self.south = self._ctx.socket(zmq.PAIR)
self.east = self._ctx.socket(zmq.PAIR)
# bind to ports
northport = self.north.bind_to_random_port(interface)
eastport = self.east.bind_to_random_port(interface)
self.north_url = interface + ":%i" % northport
self.east_url = interface + ":%i" % eastport
# guess first public IP from socket
self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
def __del__(self):
self.north.close()
self.south.close()
self.east.close()
self.west.close()
self._ctx.term()
@property
def info(self):
"""return the connection info for this object's sockets."""
return (self.location, self.north_url, self.east_url)
def connect(self, south_peer=None, west_peer=None):
"""connect to peers. `peers` will be a 3-tuples, of the form:
(location, north_addr, east_addr)
as produced by
"""
if south_peer is not None:
location, url, _ = south_peer
self.south.connect(disambiguate_url(url, location))
if west_peer is not None:
location, _, url = west_peer
self.west.connect(disambiguate_url(url, location))