def test_single_socket_forwarder_connect(self):
if zmq.zmq_version() in ('4.1.1', '4.0.6'):
raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
python类zmq_version()的实例源码
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_single_socket_forwarder_bind(self):
if zmq.zmq_version() in ('4.1.1', '4.0.6'):
raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_zmq_version(self):
v = zmq.zmq_version()
self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_zmq_version(self):
v = zmq.zmq_version()
self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_zmq_version(self):
v = zmq.zmq_version()
self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_zmq_version(self):
v = zmq.zmq_version()
self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_zmq_version(self):
v = zmq.zmq_version()
self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
def test_zmq(pyi_builder):
pyi_builder.test_source(
"""
import zmq
print(zmq.__version__)
print(zmq.zmq_version())
# This is a problematic module and might cause some issues.
import zmq.utils.strtypes
""")
def test_zmq(pyi_builder):
pyi_builder.test_source(
"""
import zmq
print(zmq.__version__)
print(zmq.zmq_version())
# This is a problematic module and might cause some issues.
import zmq.utils.strtypes
""")