python类Namespace()的实例源码

backend.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, url, watchdog=None, loop=None):
        self._resp_queue = asyncio.Queue(loop=loop)
        self.loop = loop
        assert url.startswith('ws://') or url.startswith('wss://')
        self.url = url
        self._websocket = None
        self._ws_recv_handler_task = None
        self._watchdog_task = None
        self.watchdog_time = watchdog
        self._signal_ns = blinker.Namespace()
test_receivers.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_register_receivers(base_app, event_entrypoints):
    """Test signal-receiving/event-emitting functions registration."""
    try:
        _signals = Namespace()
        my_signal = _signals.signal('my-signal')

        def event_builder1(event, sender_app, signal_param, *args, **kwargs):
            event.update(dict(event_param1=signal_param))
            return event

        def event_builder2(event, sender_app, signal_param, *args, **kwargs):
            event.update(dict(event_param2=event['event_param1'] + 1))
            return event

        base_app.config.update(dict(
            STATS_EVENTS=dict(
                event_0=dict(
                    signal=my_signal,
                    event_builders=[event_builder1, event_builder2]
                )
            )
        ))
        InvenioStats(base_app)
        current_queues.declare()
        my_signal.send(base_app, signal_param=42)
        my_signal.send(base_app, signal_param=42)
        events = [event for event in current_stats.consume('event_0')]
        # two events should have been created from the sent events. They should
        # have been both processed by the two event builders.
        assert events == [{'event_param1': 42, 'event_param2': 43}] * 2
    finally:
        current_queues.delete()


问题


面经


文章

微信
公众号

扫码关注公众号