def start(self, socket):
"""
Start the monitoring thread and socket.
:param socket: Socket to monitor.
"""
# Start a thread only if it is not already running.
if self.monitor_listening.is_set():
return
# Setup monitor socket.
monitor_socket = socket.get_monitor_socket(events=self.events)
monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
self.monitor_listening.set()
def event_listener(monitor_listening):
while monitor_listening.is_set():
try:
event = recv_monitor_message(monitor_socket)
# The socket is closed, just stop listening now.
if event["event"] == zmq.EVENT_CLOSED:
monitor_listening.clear()
self._notify_listeners(event)
# In case the receive cannot be completed before the timeout.
except zmq.Again:
# Heartbeat for listeners - we do not need an additional thread for time based listeners.
self._notify_listeners(None)
# Cleanup monitor socket.
socket.disable_monitor()
monitor_socket.close()
self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
# In case someone does not call disconnect, this will stop the thread anyway.
self.monitor_thread.daemon = True
self.monitor_thread.start()
python类Again()的实例源码
def send(self, message, send_more=False, block=True, as_json=False):
flags = 0
if send_more:
flags = zmq.SNDMORE
if not block:
flags = flags | zmq.NOBLOCK
try:
if as_json:
self.socket.send_json(message, flags)
else:
self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
except zmq.Again as e:
if not block:
pass
else:
raise e
except zmq.ZMQError as e:
logger.error(sys.exc_info()[1])
raise e
def forward(self, data):
try:
# self.logger.debug('sending message')
self.list_communication_channel.send(data)
# self.logger.debug('ok with the message')
except zmq.NotDone:
# time.sleep(TRY_TIMEOUT)
self.logger.debug('my recipient is dead, not done')
self.list_communication_channel.close()
except zmq.Again:
self.logger.debug('my recipient is dead')
# self.list_communication_channel.close()
raise zmq.Again
except zmq.ZMQError as a:
self.logger.debug("Error in message forward " + a.strerror)
self.context.destroy()
self.context = zmq.Context()
def test_disconnection(self):
""" Test the disconnection of subscribers. """
from supvisors.utils import InternalEventHeaders
# get the local address
local_address = self.supvisors.address_mapper.local_address
# test remote disconnection
address = next(address
for address in self.supvisors.address_mapper.addresses
if address != local_address)
self.subscriber.disconnect([address])
# send a tick event from the local publisher
payload = {'date': 1000}
self.publisher.send_tick_event(payload)
# check the reception of the tick event
msg = self.receive('Tick')
self.assertTupleEqual((InternalEventHeaders.TICK,
local_address, payload), msg)
# test local disconnection
self.subscriber.disconnect([local_address])
# send a tick event from the local publisher
self.publisher.send_tick_event(payload)
# check the non-reception of the tick event
with self.assertRaises(zmq.Again):
self.subscriber.receive()
def generator_from_zmq_pull(context, host):
socket = context.socket(zmq.PULL)
# TODO: Configure socket with clean properties to avoid message overload.
if host.endswith('/'):
host = host[:-1]
print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
socket.bind(host)
while True:
try:
message = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is None:
yield None # NOTE: We have to make the generator non blocking.
else:
task = json.loads(message)
yield task
def get_messages(self, timeout=0.1, count=1):
started = time()
sleep_time = timeout / 10.0
while count:
try:
msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
except zmq.Again:
if time() - started > timeout:
break
sleep(sleep_time)
else:
partition_seqno, global_seqno = unpack(">II", msg[2])
seqno = global_seqno if self.count_global else partition_seqno
if not self.counter:
self.counter = seqno
elif self.counter != seqno:
if self.seq_warnings:
self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
"isn't missing messages." % (self.counter, seqno))
self.counter = None
yield msg[1]
count -= 1
if self.counter:
self.counter += 1
self.stats[self.stat_key] += 1
def _get_data(self, blocking=True):
"""Get batch of data."""
# TODO complete docstring.
if not blocking:
try:
batch = self.socket.recv(flags=zmq.NOBLOCK)
except zmq.Again:
return None
else:
batch = self.socket.recv()
if batch == TERM_MSG:
raise EOCError()
if self.structure == 'array':
batch = numpy.fromstring(batch, dtype=self.dtype)
batch = numpy.reshape(batch, self.shape)
elif self.structure == 'dict':
batch = json.loads(batch)
elif self.structure == 'boolean':
batch = bool(batch)
return batch
def _receiveFromListener(self, quota) -> int:
"""
Receives messages from listener
:param quota: number of messages to receive
:return: number of received messages
"""
assert quota
i = 0
while i < quota:
try:
ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
if not msg:
# Router probing sends empty message on connection
continue
i += 1
if self.onlyListener and ident not in self.remotesByKeys:
self.peersWithoutRemotes.add(ident)
self._verifyAndAppend(msg, ident)
except zmq.Again:
break
if i > 0:
logger.trace('{} got {} messages through listener'.
format(self, i))
return i
def test_retry_recv(self):
pull = self.socket(zmq.PULL)
pull.rcvtimeo = self.timeout_ms
self.alarm()
self.assertRaises(zmq.Again, pull.recv)
assert self.timer_fired
def test_retry_send(self):
push = self.socket(zmq.PUSH)
push.sndtimeo = self.timeout_ms
self.alarm()
self.assertRaises(zmq.Again, push.send, b('buf'))
assert self.timer_fired
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def start_listener(self):
print('ZMQ listener started')
while True:
try:
self.s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(0.05)
else:
self.on_q.put(ON_SIGNAL)
def client_behavior(settings, logger):
internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)
try:
internal_channel.generate_internal_channel_client_side()
except ZMQError as e:
logger.debug(e)
message = Message()
message.priority = ALIVE
message.source_flag = INT
message.source_id = '1'
message.target_id = '1'
message.target_addr = '192.168.1.1'
message.target_key = '{}:{}'.format(0, 19)
internal_channel.send_first_internal_channel_message(dumps(message))
msg = internal_channel.wait_int_message(dont_wait=False)
logger.debug("msg : " + msg)
external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
external_channel.generate_external_channel_client_side()
external_channel.external_channel_subscribe()
logger.debug(loads(external_channel.wait_ext_message()).printable_message())
logger.debug("try_to_connect TEST COMPLETED")
stop = False
while not stop:
try:
logger.debug(loads(external_channel.wait_ext_message()).printable_message())
sleep(1)
except Again:
logger.debug("my master is DEAD")
stop = True
def server_behavior(settings, logger):
internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)
try:
internal_channel.generate_internal_channel_server_side()
msg = loads(internal_channel.wait_int_message(dont_wait=False))
logger.debug("msg : ")
logger.debug(msg.printable_message())
internal_channel.reply_to_int_message(OK)
except ZMQError as e:
logger.debug(e)
external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
external_channel.generate_external_channel_server_side()
external_channel.external_channel_publish()
message = Message()
message.priority = ALIVE
message.source_flag = EXT
message.source_id = '1'
message.target_id = '1'
message.target_addr = '192.168.1.1'
message.target_key = '{}:{}'.format(0, 19)
sleep(1)
external_channel.forward(dumps(message))
logger.debug("try_to_connect TEST COMPLETED")
stop = False
while not stop:
try:
external_channel.forward(dumps(message))
sleep(1)
except zmq.Again:
stop = True
def wait_int_message(self, dont_wait=True):
if dont_wait:
# wait for internal message
try:
msg = self.list_communication_channel.recv(zmq.DONTWAIT)
return msg
except zmq.Again:
raise zmq.Again
else:
self.logger.debug('waiting for a request')
msg = self.list_communication_channel.recv()
return msg
def receive(self, event_type):
""" This method performs a checked reception on the subscriber. """
try:
self.subscriber.socket.poll(1000)
return self.subscriber.receive()
except zmq.Again:
self.fail('Failed to get {} event'.format(event_type))
def receive(self, event_type):
""" This method performs a checked reception on the puller. """
try:
return self.puller.receive()
except zmq.Again:
self.fail('Failed to get {} request'. format(event_type))
def check_reception(self, header=None, data=None):
""" The method tests that the message is received correctly
or not received at all. """
if header and data:
# check that subscriber receives the message
try:
msg = self.subscriber.receive()
except zmq.Again:
self.fail('Failed to get {} status'.format(header))
self.assertTupleEqual((header, data), msg)
else:
# check the non-reception of the Supvisors status
with self.assertRaises(zmq.Again):
self.subscriber.receive()
def _send_and_receive(self, message):
"""Sending payloads to NM and returning Response instances.
Or, if the action failed, an error will be raised during the instantiation
of the Response. Can also timeout if the socket receives no data for some
period.
Args:
message: dict of a message to send to NM
Returns:
Response instance if the request succeeded
Raises:
TimeoutError: if nothing is received for the timeout
"""
# zmq is thread unsafe: if we send a second request before
# we get back the first response, we throw an exception
# fix that -kheimerl
with self.lock:
# Send the message and poll for responses.
self.socket.send(json.dumps(message))
responses = self.socket.poll(timeout=self.socket_timeout * 1000)
if responses:
try:
raw_response_data = self.socket.recv()
return Response(raw_response_data)
except zmq.Again:
pass
# If polling fails or recv failes, we reset the socket or
# it will be left in a bad state, waiting for a response.
self.socket.close()
self.setup_socket()
self.socket.connect(self.address)
raise TimeoutError('did not receive a response')
def handle_in(self):
try:
tmp = self.socket.recv_multipart()
except zmq.Again:
return
if len(tmp) != 2:
self.logger.critical('Received a msg with len != 2, something seriously wrong. ')
return
sender, msg_buf = tmp
msg = msg_factory(msg_buf)
data = self.controllers.get(sender)
if not data:
self.logger.critical('Received a msg from %s - this is an unknown sender' % sender)
return
data['last_seen'] = time.time()
# self.logger.debug('Received from %s' % sender)
# TODO Notify Controllers that we are busy, no more messages to be sent
# The above busy notification is not perfect as other messages might be on their way already
# but for long-running queries it will at least ensure other controllers
# don't try and overuse this node by filling up a queue
busy_msg = BusyMessage()
self.send_to_all(busy_msg)
try:
tmp = self.handle(msg)
except Exception, e:
tmp = ErrorMessage(msg)
tmp['payload'] = traceback.format_exc()
self.logger.exception(tmp['payload'])
if tmp:
self.send(sender, tmp)
self.send_to_all(DoneMessage()) # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
def run(self):
while True:
try:
message = self.pull.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is not None:
task = json.loads(message)
self.redis.setex(
task['transaction'],
self.result_expiration,
json.dumps(task['data'])
)
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def send_raw_msg (self, msg):
tries = 0
while True:
try:
self.socket.send(msg)
break
except zmq.Again:
tries += 1
if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to send message to server")
tries = 0
while True:
try:
response = self.socket.recv()
break
except zmq.Again:
tries += 1
if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
return response
# processs a single response from server
def receive(self):
'''
Return the message received.
..note::
In ZMQ we are unable to get the address where we got the message from.
'''
try:
msg = self.sub.recv()
except zmq.Again as error:
log.error('Unable to receive messages: %s', error, exc_info=True)
raise ListenerException(error)
log.debug('[%s] Received %s', time.time(), msg)
return msg, ''
def test_process_does_not_block(self):
mock_socket = Mock()
mock_socket.recv_unicode.side_effect = zmq.Again()
server = ControlServer(None, connection_string='127.0.0.1:10000')
server._socket = mock_socket
assertRaisesNothing(self, server.process)
mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
def process(self, blocking=False):
"""
Each time this method is called, the socket tries to retrieve data and passes
it to the JSONRPCResponseManager, which in turn passes the RPC to the
ExposedObjectCollection.
In case no data are available, the method does nothing. This behavior is required for
Lewis where everything is running in one thread. The central loop can call process
at some point to process remote calls, so the RPC-server does not introduce its own
infinite processing loop.
If the server has not been started yet (via :meth:`start_server`), a RuntimeError
is raised.
:param blocking: If True, this function will block until it has received data or a timeout
is triggered. Default is False to preserve behavior of prior versions.
"""
if self._socket is None:
raise RuntimeError('The server has not been started yet, use start_server to do so.')
try:
request = self._socket.recv_unicode(flags=zmq.NOBLOCK if not blocking else 0)
self.log.debug('Got request %s', request)
try:
response = JSONRPCResponseManager.handle(request, self._exposed_object)
self._socket.send_unicode(response.json)
self.log.debug('Sent response %s', response.json)
except TypeError as e:
self._socket.send_json(
self._unhandled_exception_response(json.loads(request)['id'], e))
except zmq.Again:
pass