def serviceA(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'A')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service A got:"
print message
if message == "Service A":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service A did your laundry")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
python类DEALER的实例源码
def _execute_command(self, command):
if len(self.job_servers) == 0:
app_log.error('there is no job server')
return
server = self.job_servers[self.job_server_index]
self.job_server_index = (self.job_server_index + 1) % len(self.job_servers)
context = zmq.Context.instance()
zmq_sock = context.socket(zmq.DEALER)
zmq_sock.linger = 1000
zmq_sock.identity = bytes(str(os.getpid()), 'ascii')
ip = server['ip']
if ip == '*':
ip = 'localhost'
url = 'tcp://{0}:{1}'.format(ip, server['zmq_port'])
app_log.info('connect %s', url)
zmq_sock.connect(url)
command = json_encode({'command': command})
app_log.info('command: %s', command)
zmq_sock.send_multipart([b'0', bytes(command, 'ascii')])
stream = ZMQStream(zmq_sock)
stream.on_recv(self.response_handler)
def serviceB(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'B')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service B got:"
print message
if message == "Service B":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service B cleaned your room")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def flash(self):
if self.pid != str(os.getpid()):
# reset process pid
self.pid = str(os.getpid())
# update zmq sockets
# (couldnt share socket in differenet process)
self.zmq_socket = zmq.Context().socket(zmq.REQ)
self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)
# update context
ctx = main_context(self.main_file, self.main_folder)
if self.main_param is not None:
main_config_path = os.path.join(self.main_folder, self.main_param)
params = yaml.load(open(main_config_path, 'r'))
ctx.params = params
self.context = ctx
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def bounce(self, server, client, test_metadata=True):
msg = [os.urandom(64), os.urandom(64)]
client.send_multipart(msg)
frames = self.recv_multipart(server, copy=False)
recvd = list(map(lambda x: x.bytes, frames))
try:
if test_metadata and not PYPY:
for frame in frames:
self.assertEqual(frame.get('User-Id'), 'anonymous')
self.assertEqual(frame.get('Hello'), 'World')
self.assertEqual(frame['Socket-Type'], 'DEALER')
except zmq.ZMQVersionError:
pass
self.assertEqual(recvd, msg)
server.send_multipart(recvd)
msg2 = self.recv_multipart(client)
self.assertEqual(msg2, msg)
def skip_plain_inauth(self):
"""test PLAIN failed authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
client.plain_username = USER
client.plain_password = b'incorrect'
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
client.send(b'ping')
server.rcvtimeo = 250
self.assertRaisesErrno(zmq.EAGAIN, server.recv)
self.stop_zap()
def __init__(self, port, pipeline=100, host='localhost', log_file=None):
"""Create a new ZMQDealer object.
"""
context = zmq.Context.instance()
# noinspection PyUnresolvedReferences
self.socket = context.socket(zmq.DEALER)
self.socket.hwm = pipeline
self.socket.connect('tcp://%s:%d' % (host, port))
self._log_file = log_file
self.poller = zmq.Poller()
# noinspection PyUnresolvedReferences
self.poller.register(self.socket, zmq.POLLIN)
if self._log_file is not None:
self._log_file = os.path.abspath(self._log_file)
# If log file directory does not exists, create it
log_dir = os.path.dirname(self._log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# clears any existing log
if os.path.exists(self._log_file):
os.remove(self._log_file)
def __init__(self, targname, cfg, isServer=False):
self.targname = targname
self.cfg = cfg
self.isServer = isServer
self.fnCallName = ''
self.ctx = zmq.Context()
self.ctx.linger = 100
if not self.isServer:
self.sock = self.ctx.socket(zmq.DEALER)
self.sock.linger = 100
self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
else:
self.sock = self.ctx.socket(zmq.ROUTER)
self.sock.linger = 100
self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677)))
self.poller = zmq.Poller()
self.poller.register(self.sock, zmq.POLLIN)
self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg)
self.inTime = time.time()
self.inactiveLimit = int(self.cfg.get('inactivelimit',0))
print 'inactivelimit ',self.inactiveLimit
def do_send(self, filename):
"""
If a build succeeds and generates files (detailed in a "BUILT"
message), the master will reply with "SEND" *filename* indicating we
should transfer the specified file (this is done on a separate socket
with a different protocol; see :meth:`builder.PiWheelsPackage.transfer`
for more details). Once the transfers concludes, reply to the master
with "SENT".
"""
assert self.slave_id is not None, 'Send before hello'
assert self.builder, 'Send before build / after failed build'
assert self.builder.status, 'Send after failed build'
pkg = [f for f in self.builder.files if f.filename == filename][0]
self.logger.info('Sending %s to master on localhost', pkg.filename)
ctx = zmq.Context.instance()
queue = ctx.socket(zmq.DEALER)
queue.ipv6 = True
queue.hwm = 10
queue.connect('tcp://{master}:5556'.format(master=self.config.master))
try:
pkg.transfer(queue, self.slave_id)
finally:
queue.close()
return ['SENT']
def do_send(builder, filename):
"""
Handles sending files when requested by :func:`do_import`.
"""
logging.info('Sending %s to master', filename)
pkg = [f for f in builder.files if f.filename == filename][0]
ctx = zmq.Context.instance()
queue = ctx.socket(zmq.DEALER)
queue.ipv6 = True
queue.hwm = 10
# NOTE: The following assumes that we're running on the master; this
# *should* be the case (it's risky to run the importer on a tcp queue)
# but there's no guarantee of this.
queue.connect('tcp://localhost:5556')
try:
pkg.transfer(queue, 0)
finally:
queue.close()
def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def test_cpu_usage_after_pub_send_or_dealer_recv(self):
"""zmq eats CPU after PUB send or DEALER recv.
Same https://bitbucket.org/eventlet/eventlet/issue/128
"""
pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b"")
eventlet.sleep()
pub.send(b'test_send')
tests.check_idle_cpu_usage(0.2, 0.1)
sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
sender.send(b'test_recv')
msg = receiver.recv()
self.assertEqual(msg, b'test_recv')
tests.check_idle_cpu_usage(0.2, 0.1)
def _send(self, ident, message):
"""
(asyncio coroutine) Send the message and wait for a response.
:param message (sawtooth_sdk.protobuf.Message)
:param ident (str) the identity of the zmq.DEALER to send to
"""
LOGGER.debug(
"Sending %s(%s) to %s",
str(to_protobuf_class(message.message_type).__name__),
str(message.message_type),
str(ident)
)
return await self._socket.send_multipart([
ident,
message.SerializeToString()
])
def __init__(self, url):
self._url = url
self._ctx = Context.instance()
self._socket = self._ctx.socket(zmq.DEALER)
self._socket.identity = uuid.uuid4().hex.encode()[0:16]
self._msg_router = _MessageRouter()
self._receiver = _Receiver(self._socket, self._msg_router)
self._sender = _Sender(self._socket, self._msg_router)
self._connection_state_listeners = {}
self._recv_task = None
# Monitoring properties
self._monitor_sock = None
self._monitor_fd = None
self._monitor_task = None