mflow.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:mflow 作者: datastreaming 项目源码 文件源码
def receive(self, handler=None, block=True):
        """
        :param handler:     Reference to a specific message handler function to use for interpreting
                            the message to be received
        :param block:       Blocking receive call
        :return:            Map holding the data, timestamp, data and main header
        """

        message = None
        # Set blocking flag in receiver
        self.receiver.block = block
        receive_is_successful = False

        if not handler:
            try:
                # Dynamically select handler
                htype = self.receiver.header()["htype"]
            except zmq.Again:
                # not clear if this is needed
                self.receiver.flush(receive_is_successful)
                return message
            except KeyboardInterrupt:
                raise
            except:
                logger.exception('Unable to read header - skipping')
                # Clear remaining sub-messages if exist
                self.receiver.flush(receive_is_successful)
                return message

            try:
                handler = receive_handlers[htype]
            except:
                logger.debug(sys.exc_info()[1])
                logger.warning('htype - ' + htype + ' -  not supported')

        try:
            data = handler(self.receiver)
            # as an extra safety margin
            if data:
                receive_is_successful = True
                message = Message(self.receiver.statistics, data)
        except KeyboardInterrupt:
            raise
        except:
            logger.exception('Unable to decode message - skipping')

        # Clear remaining sub-messages if exist
        self.receiver.flush(receive_is_successful)

        return message
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号