def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
python类ZMQError()的实例源码
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def _recv(self, noError = False,):
if self._watched and not noError:
raise RPCError('you cannot recieve on a watched connection')
try:
self._log.debug('waiting for incoming data')
data = self._socket.recv_string()
topic, msg = self._unpack_event(data)
except zmq.ZMQError:
return None
except ValueError:
raise RPCError('Malformed message body')
self._log.debug('recieved event %s on topic:%s'%(self._event_id, topic))
with open('test.txt', 'a') as f:
f.write('%s %s\n'%(self._event_id, time.time()))
self._event_id += 1
return Event(topic, msg)
def _handle_request(self, request):
"""Handle *request*, return reply."""
if not isinstance(request, dict):
return self.error('invalid request: a dictionary is required.')
command = request.get('command')
if not command:
log.error('invalid request was %r', request)
return self.error('invalid request: no command.')
try:
reply = self.handle_request(request)
if reply is None:
log.error('invalid request was %r', request)
return self.error('invalid request: not handled')
return reply
except Exception as exc:
log.exception('Error during request processing. Request was %r', request)
if not isinstance(exc, zmq.ZMQError) and self.socket:
# Probably need to send a reply
return self.error('Uncaught exception during processing')
sys.exit(1)
def mainloop_recv(self):
try:
while True:
if self._frsock.closed:
break
msg = loadb(self._frsock.recv(copy=False).bytes)
identifier, type, payload = msg
self._dispatcher.dispatch(type, self, identifier, payload)
except zmq.ContextTerminated:
pass
except zmq.ZMQError as e:
if self._tosock.closed:
logger.warn('Recv socket closed unexpectedly.')
else:
raise e
def assertRaisesErrno(self, errnos, func, *args):
try:
func(*args)
except zmq.ZMQError as e:
if not hasattr(errnos, '__iter__'):
errnos = (errnos,)
if e.errno not in errnos:
raise AssertionError(
"wrong error raised, expected one of ['%s'], got '%s'" % (
", ".join("%s" % zmq.ZMQError(errno) for errno in errnos),
zmq.ZMQError(e.errno)
),
)
else:
self.fail("Function did not raise any error")
def start(self):
"""Starts receiving messages on the underlying socket and passes them
to the message router.
"""
self._is_running = True
while self._is_running:
try:
zmq_msg = await self._socket.recv_multipart()
message = Message()
message.ParseFromString(zmq_msg[-1])
await self._msg_router.route_msg(message)
except DecodeError as e:
LOGGER.warning('Unable to decode: %s', e)
except zmq.ZMQError as e:
LOGGER.warning('Unable to receive: %s', e)
return
except asyncio.CancelledError:
self._is_running = False
def run(self):
while not self._terminate:
connection_message = None
try:
connection_message = self._socket.recv_multipart(zmq.NOBLOCK)
[tag, json_message] = connection_message
message = json.loads(json_message.decode('utf8'))
if tag == b"queryResponse":
self._bus.resolve_response(message)
else:
handler_thread = ZMQHandlerThread(self._bus, tag.decode('utf-8'), message)
handler_thread.start()
#time.sleep(0.001)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
time.sleep(.001)
pass
elif e.errno == zmq.ETERM:
#print("terminate", self._address)
self._terminate = True
else:
print("message zmq exception:", self._address, e, e.errno)
except Exception as e:
print("message exception:", self._address, e, connection_message)
#print("message thread terminated:", self._address)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def recv(self, flags=0, copy=True, track=False):
"""recv, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).recv(flags, copy, track)
finally:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
flags |= zmq.NOBLOCK
while True:
try:
msg = super(_Socket, self).recv(flags, copy, track)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
if not self.__in_recv_multipart:
self.__state_changed()
raise
else:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
self._wait_read()
def run(self):
"""wrap run_device in try/catch ETERM"""
try:
self.run_device()
except ZMQError as e:
if e.errno == ETERM:
# silence TERM errors, because this should be a clean shutdown
pass
else:
raise
finally:
self.done = True
def test_ipc_path_max_length_msg(self):
if zmq.IPC_PATH_MAX_LEN == 0:
raise SkipTest("IPC_PATH_MAX_LEN undefined")
s = self.context.socket(zmq.PUB)
self.sockets.append(s)
try:
s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
except zmq.ZMQError as e:
self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
def test_term_thread(self):
"""ctx.term should not crash active threads (#139)"""
ctx = self.Context()
evt = Event()
evt.clear()
def block():
s = ctx.socket(zmq.REP)
s.bind_to_random_port('tcp://127.0.0.1')
evt.set()
try:
s.recv()
except zmq.ZMQError as e:
self.assertEqual(e.errno, zmq.ETERM)
return
finally:
s.close()
self.fail("recv should have been interrupted with ETERM")
t = Thread(target=block)
t.start()
evt.wait(1)
self.assertTrue(evt.is_set(), "sync event never fired")
time.sleep(0.01)
ctx.term()
t.join(timeout=1)
self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
def test_double_stop(self):
"""Test error raised on multiple calls to stop."""
watch = Stopwatch()
watch.start()
watch.stop()
self.assertRaises(ZMQError, watch.stop)
self.assertRaises(ZMQError, watch.stop)