def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
python类ZMQError()的实例源码
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def poll(self, timeout=None, flags=_zmq.POLLIN):
"""poll the socket for events
returns a Future for the poll results.
"""
if self.closed:
raise _zmq.ZMQError(_zmq.ENOTSUP)
p = self._poller_class()
p.register(self, flags)
f = p.poll(timeout)
future = self._Future()
def unwrap_result(f):
if future.done():
return
if f.exception():
future.set_exception(f.exeception())
else:
evts = dict(f.result())
future.set_result(evts.get(self, 0))
f.add_done_callback(unwrap_result)
return future
def send(self, message, send_more=False, block=True, as_json=False):
flags = 0
if send_more:
flags = zmq.SNDMORE
if not block:
flags = flags | zmq.NOBLOCK
try:
if as_json:
self.socket.send_json(message, flags)
else:
self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
except zmq.Again as e:
if not block:
pass
else:
raise e
except zmq.ZMQError as e:
logger.error(sys.exc_info()[1])
raise e
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_pub(self):
"""Publish log messages. bind() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
pub.bind('tcp://*:{}'.format(self.sub_port))
except zmq.ZMQError as error:
print(error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
sys.stdout.flush()
# Wait for the watcher thread to exit.
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
def forward(self, data):
try:
# self.logger.debug('sending message')
self.list_communication_channel.send(data)
# self.logger.debug('ok with the message')
except zmq.NotDone:
# time.sleep(TRY_TIMEOUT)
self.logger.debug('my recipient is dead, not done')
self.list_communication_channel.close()
except zmq.Again:
self.logger.debug('my recipient is dead')
# self.list_communication_channel.close()
raise zmq.Again
except zmq.ZMQError as a:
self.logger.debug("Error in message forward " + a.strerror)
self.context.destroy()
self.context = zmq.Context()
def send_int_message(self, msg=b'ALIVE', timeout=TRACKER_INFINITE_TIMEOUT):
try:
self.logger.debug('sending message to {}'.format(self.sync_address))
tracker_object = self.list_communication_channel.send(msg, track=True, copy=False)
# wait forever
tracker_object.wait(timeout)
# self.logger.debug('ok with the message')
except zmq.NotDone:
self.logger.debug('Something went wrong with that message')
time.sleep(TRY_TIMEOUT)
# self.logger.debug('Sleep finished')
# self.list_communication_channel.close()
except zmq.ZMQError as a:
self.logger.debug(a.strerror)
self.context.destroy()
self.context = zmq.Context()
self.generate_internal_channel_client_side()
# used when it's the first time to sync
def get_command(self):
"""Attempt to return a unicode object from the command socket
If no message is available without blocking (as opposed to a blank
message), return None
"""
try:
message_bytes = self.socket.recv(zmq.NOBLOCK)
log.debug("Received message: %r", message_bytes)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return None
else:
raise
else:
return message_bytes.decode(config.CODEC)
def _bind_with_timeout(bind_function, args, n_tries=3, retry_interval_s=0.5):
"""Attempt to bind a socket a number of times with a short interval in between
Especially on Linux, crashing out of a networkzero process can leave the sockets
lingering and unable to re-bind on startup. We give it a few goes here to see if
we can bind within a couple of seconds.
"""
n_tries_left = n_tries
while n_tries_left > 0:
try:
return bind_function(*args)
except zmq.error.ZMQError as exc:
_logger.warn("%s; %d tries remaining", exc, n_tries_left)
n_tries_left -= 1
except OSError as exc:
if exc.errno == errno.EADDRINUSE:
_logger.warn("%s; %d tries remaining", exc, n_tries_left)
n_tries_left -= 1
else:
raise
else:
raise core.SocketAlreadyExistsError("Failed to bind after %s tries" % n_tries)
def poll_command_request(self):
"""If the command RPC socket has an incoming request,
separate it into its action and its params and put it
on the command request queue.
"""
try:
message = self.rpc.recv(zmq.NOBLOCK)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
return
else:
raise
_logger.debug("Received command %s", message)
segments = _unpack(message)
action, params = segments[0], segments[1:]
_logger.debug("Adding %s, %s to the request queue", action, params)
self._command = _Command(action, params)
def bind_to_random_port(socket, addr, min_port=49152, max_port=65536, max_tries=100):
"We can't just use the zmq.Socket.bind_to_random_port, as we wan't to set the identity before binding"
for i in range(max_tries):
try:
port = random.randrange(min_port, max_port)
socket.identity = '%s:%s' % (addr, port)
socket.bind('tcp://*:%s' % port)
#socket.bind('%s:%s' % (addr, port))
except zmq.ZMQError as exception:
en = exception.errno
if en == zmq.EADDRINUSE:
continue
else:
raise
else:
return socket.identity
raise zmq.ZMQBindError("Could not bind socket to random port.")
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_keypair(self):
"""test curve_keypair"""
try:
public, secret = zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("CURVE unsupported")
self.assertEqual(type(secret), bytes)
self.assertEqual(type(public), bytes)
self.assertEqual(len(secret), 40)
self.assertEqual(len(public), 40)
# verify that it is indeed Z85
bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
self.assertEqual(type(bsecret), bytes)
self.assertEqual(type(bpublic), bytes)
self.assertEqual(len(bsecret), 32)
self.assertEqual(len(bpublic), 32)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _handle_recv(self):
"""Handle a recv event."""
if self._flushed:
return
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
else:
if self._recv_callback:
callback = self._recv_callback
# self._recv_callback = None
self._run_callback(callback, msg)
# self.update_state()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()