def receive_message(socket, blocking=True):
flags = 0 if blocking else zmq.NOBLOCK
try:
cmd, data = socket.recv_multipart(flags=flags)
return cmd, data
except zmq.Again:
return None, None
except zmq.ContextTerminated:
print("Context terminated ..")
return None, None
except KeyboardInterrupt:
return None, None
python类Again()的实例源码
def _receiveFromRemotes(self, quotaPerRemote) -> int:
"""
Receives messages from remotes
:param quotaPerRemote: number of messages to receive from one remote
:return: number of received messages
"""
assert quotaPerRemote
totalReceived = 0
for ident, remote in self.remotesByKeys.items():
if not remote.socket:
continue
i = 0
sock = remote.socket
while i < quotaPerRemote:
try:
msg, = sock.recv_multipart(flags=zmq.NOBLOCK)
if not msg:
# Router probing sends empty message on connection
continue
i += 1
self._verifyAndAppend(msg, ident)
except zmq.Again:
break
if i > 0:
logger.trace('{} got {} messages through remote {}'.
format(self, i, remote))
totalReceived += i
return totalReceived
def transmit(self, msg, uid, timeout=None, serialized=False):
remote = self.remotes.get(uid)
err_str = None
if not remote:
logger.debug("Remote {} does not exist!".format(uid))
return False, err_str
socket = remote.socket
if not socket:
logger.debug('{} has uninitialised socket '
'for remote {}'.format(self, uid))
return False, err_str
try:
if not serialized:
msg = self.prepare_to_send(msg)
# socket.send(self.signedMsg(msg), flags=zmq.NOBLOCK)
socket.send(msg, flags=zmq.NOBLOCK)
logger.debug('{} transmitting message {} to {}'
.format(self, msg, uid))
if not remote.isConnected and msg not in self.healthMessages:
logger.debug('Remote {} is not connected - '
'message will not be sent immediately.'
'If this problem does not resolve itself - '
'check your firewall settings'.format(uid))
return True, err_str
except zmq.Again:
logger.debug(
'{} could not transmit message to {}'.format(self, uid))
except InvalidMessageExceedingSizeException as ex:
err_str = '{}Cannot transmit message. Error {}'.format(
CONNECTION_PREFIX, ex)
logger.error(err_str)
return False, err_str
def transmitThroughListener(self, msg, ident) -> Tuple[bool, Optional[str]]:
if isinstance(ident, str):
ident = ident.encode()
if ident not in self.peersWithoutRemotes:
logger.debug('{} not sending message {} to {}'.
format(self, msg, ident))
logger.debug("This is a temporary workaround for not being able to "
"disconnect a ROUTER's remote")
return False, None
try:
msg = self.prepare_to_send(msg)
# noinspection PyUnresolvedReferences
# self.listener.send_multipart([ident, self.signedMsg(msg)],
# flags=zmq.NOBLOCK)
logger.trace('{} transmitting {} to {} through listener socket'.
format(self, msg, ident))
self.listener.send_multipart([ident, msg], flags=zmq.NOBLOCK)
return True, None
except zmq.Again:
return False, None
except InvalidMessageExceedingSizeException as ex:
err_str = '{}Cannot transmit message. Error {}'.format(
CONNECTION_PREFIX, ex)
logger.error(err_str)
return False, err_str
except Exception as e:
err_str = '{}{} got error {} while sending through listener to {}'\
.format(CONNECTION_PREFIX, self, e, ident)
logger.error(err_str)
return False, err_str
return True, None
def receive(self, handler=None, block=True):
"""
:param handler: Reference to a specific message handler function to use for interpreting
the message to be received
:param block: Blocking receive call
:return: Map holding the data, timestamp, data and main header
"""
message = None
# Set blocking flag in receiver
self.receiver.block = block
receive_is_successful = False
if not handler:
try:
# Dynamically select handler
htype = self.receiver.header()["htype"]
except zmq.Again:
# not clear if this is needed
self.receiver.flush(receive_is_successful)
return message
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to read header - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
try:
handler = receive_handlers[htype]
except:
logger.debug(sys.exc_info()[1])
logger.warning('htype - ' + htype + ' - not supported')
try:
data = handler(self.receiver)
# as an extra safety margin
if data:
receive_is_successful = True
message = Message(self.receiver.statistics, data)
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to decode message - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
def _run (self):
# socket must be created on the same thread
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
self.socket.connect(self.tr)
got_data = False
self.monitor.reset()
while self.active:
try:
with self.monitor:
line = self.socket.recv_string()
self.monitor.on_recv_msg(line)
self.last_data_recv_ts = time.time()
# signal once
if not got_data:
self.event_handler.on_async_alive()
got_data = True
# got a timeout - mark as not alive and retry
except zmq.Again:
# signal once
if got_data:
self.event_handler.on_async_dead()
got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
assert(not self.active)
break
msg = json.loads(line)
name = msg['name']
data = msg['data']
type = msg['type']
baseline = msg.get('baseline', False)
self.raw_snapshot[name] = data
self.__dispatch(name, type, data, baseline)
# closing of socket must be from the same thread
self.socket.close(linger = 0)
def recv_loop(self, configfile=None):
"""
This is the main loop receiving data and calling functions. First it calls
the read_config function if not done previously. Afterwards it connects the
ZeroMQ publisher.
The reception is non-blocking. If nothing is received, the JobMonitor sleeps
for a second. This is no problem since ZeroMQ queues the strings.
Each loop checks whether it is time to call the update function.
If the filter applies, it is analyzed for the status attribute and if it exists,
the value is checked whether a function is registered for it and finally calls it.
"""
if not self.config:
self.read_config(configfile=configfile)
if not self.context:
self.connect()
updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
while not self.terminate:
s = None
try:
s = self.socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
time.sleep(1)
except KeyboardInterrupt:
self.terminate = True
pass
if not self.terminate:
if datetime.datetime.now() > updatetime:
logging.debug("Calling update function")
self.update()
updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
if s and self._filter(s):
logging.debug("Received string: %s" % s)
m = Measurement(s)
if self.status_attr:
logging.debug("Checking status_attr: %s" % self.status_attr)
stat = m.get_attr(self.status_attr)
if stat:
for key in self.stat_funcs:
if key == stat:
logging.debug("Calling %s function" % key)
self.stat_funcs[key](m)
self.get(m)
self.disconnect()
def testSimpleZStacksMsgs(tdir, looper):
names = ['Alpha', 'Beta']
genKeys(tdir, names)
names = ['Alpha', 'Beta']
aseed = randomSeed()
bseed = randomSeed()
size = 100000
msg = json.dumps({'random': randomSeed(size).decode()}).encode()
def aHandler(m):
str_m = "{}".format(m)
print('{} printing... {}'.format(names[0], str_m[:100]))
d, _ = m
print('Message size is {}'.format(len(d['random'])))
assert len(d['random']) == size
def bHandler(m):
print(beta.msgHandler)
a = list(beta.peersWithoutRemotes)[0]
try:
beta.listener.send_multipart([a, msg],
flags=zmq.NOBLOCK)
except zmq.Again:
return False
str_m = "{}".format(m)
print('{} printing... {}'.format(names[1], str_m[:100]))
stackParams = {
"name": names[0],
"ha": genHa(),
"auto": 2,
"basedirpath": tdir
}
alpha = SimpleZStack(stackParams, aHandler, aseed, False)
stackParams = {
"name": names[1],
"ha": genHa(),
"auto": 2,
"basedirpath": tdir
}
beta = SimpleZStack(stackParams, bHandler, bseed, True)
amotor = SMotor(alpha)
looper.add(amotor)
bmotor = SMotor(beta)
looper.add(bmotor)
alpha.connect(name=beta.name, ha=beta.ha,
verKeyRaw=beta.verKeyRaw, publicKeyRaw=beta.publicKeyRaw)
looper.runFor(0.25)
alpha.send({'greetings': 'hi'}, beta.name)
looper.runFor(1)