python类monkey()的实例源码

main.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main(args=None, workers=None, client=EchoHubClient, worker_kwargs=None):
    gevent.monkey.patch_all()
    args = args if args else prepare_argparse().parse_args()
    prepare_logging(args.verbose or 1)

    if args.mode == 'server':
        hub = HubServer(workers=workers)
    elif args.mode == 'client':
        hub = client(worker_kwargs=worker_kwargs)
    else:
        raise Exception("Unknown mode '%s'." % args.mode)

    def sig_handler(sig=None, frame=None):
        log.warning("Hub process received SIGTERM/SIGINT")
        hub.stop()
        log.info("Sig handler completed.")

    gevent.signal(signal.SIGTERM, sig_handler)
    gevent.signal(signal.SIGINT, sig_handler)  # KeyboardInterrupt also

    hub.start()
    gevent.wait()
gevent.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _unpatch_logging_handlers_lock():
    # we dont want to use logger locks since those are used by both real thread and gevent greenlets
    # switching from one to the other will cause gevent hub to throw an exception
    import logging

    RLock = gevent.monkey.saved['threading']['_CRLock']

    for handler in logging._handlers.values():
        if handler.lock:
            handler.lock = RLock()

    def create_unpatched_lock_for_handler(handler):
        handler.lock = RLock()

    # patch future handlers
    logging.Handler.createLock = create_unpatched_lock_for_handler
chorus.py 文件源码 项目:wade 作者: chartbeat-labs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _patch_client_for_gevent(self):
        try:
            import gevent
            import gevent.monkey
        except ImportError:
            gevent_enabled = False
        else:
            gevent_enabled = bool(gevent.monkey.saved)

        if gevent_enabled:
            self._Timeout = gevent.Timeout
            self._sleep = gevent.sleep
            self._get_value_event = lambda: gevent.event.AsyncResult()
        else:
            self._Timeout = ValueEventTimeout
            self._sleep = lambda _: None
            self._get_value_event = self._ensure_value_event
test_recorder.py 文件源码 项目:pytest-vts 作者: bhodorog 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_catch_all_gevented_requests(vts_rec_on, movie_server):
    """Keep this test at the very end to avoid messing up with the rest of the
    tests, since it's monkey patching the network related operations.

    Maybe write a custom pytest order enforcer later."""
    def _job():
        return http_get(movie_server.url)

    from gevent.pool import Pool
    import gevent.monkey
    gevent.monkey.patch_socket(dns=True)

    pool = Pool()
    for x in range(10):
        pool.spawn(_job)
    pool.join()
    assert len(vts_rec_on.cassette) == 10
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cli_patch(cli_args):  # pragma: no coverage
    parsed_args, _ = _cli_parse(cli_args)
    opts = parsed_args
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
test_geventreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        # This is run even though the class is skipped, so we need
        # to make sure no monkey patching is happening
        if not MONKEY_PATCH_LOOP:
            return
        gevent.monkey.patch_all()
        cls.connection_class = GeventConnection
        GeventConnection.initialize_reactor()

    # There is no unpatching because there is not a clear way
    # of doing it reliably
__init__.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_gevent_monkey_patched():
    if 'gevent.monkey' not in sys.modules:
        return False
    import gevent.socket
    return socket.socket is gevent.socket.socket
__init__.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_gevent_time_monkey_patched():
    import gevent.monkey
    return "time" in gevent.monkey.saved
bottle.py 文件源码 项目:warriorframework 作者: warriorframework 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:warriorframework 作者: warriorframework 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:NebulaSolarDash 作者: toddlerya 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:NebulaSolarDash 作者: toddlerya 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import wsgi, pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if not self.options.pop('fast', None): wsgi = pywsgi
        self.options['log'] = None if self.quiet else 'default'
        address = (self.host, self.port)
        server = wsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:bottle_beginner 作者: denzow 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cli_patch(cli_args):  # pragma: no coverage
    parsed_args, _ = _cli_parse(cli_args)
    opts = parsed_args
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:bottle_beginner 作者: denzow 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _cli_patch(cli_args):  # pragma: no coverage
    parsed_args, _ = _cli_parse(cli_args)
    opts = parsed_args
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
test_libraries.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_gevent_monkey(pyi_builder):
    pyi_builder.test_source(
        """
        from gevent.monkey import patch_all
        patch_all()
        """)
bottle.py 文件源码 项目:autoinjection 作者: ChengWiLL 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:autoinjection 作者: ChengWiLL 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import wsgi, pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if not self.options.pop('fast', None): wsgi = pywsgi
        self.options['log'] = None if self.quiet else 'default'
        address = (self.host, self.port)
        server = wsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:Knjiznica 作者: TilenNoc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cli_patch(cli_args):  # pragma: no coverage
    parsed_args, _ = _cli_parse(cli_args)
    opts = parsed_args
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:Knjiznica 作者: TilenNoc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:python-course 作者: juancarlospaco 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:python-course 作者: juancarlospaco 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import wsgi, pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if not self.options.pop('fast', None): wsgi = pywsgi
        self.options['log'] = None if self.quiet else 'default'
        address = (self.host, self.port)
        server = wsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:base1k 作者: gumblex 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _cli_patch(cli_args):  # pragma: no coverage
    parsed_args, _ = _cli_parse(cli_args)
    opts = parsed_args
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:base1k 作者: gumblex 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options['log'] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:Helix 作者: 3lackrush 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:Helix 作者: 3lackrush 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import wsgi, pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if not self.options.pop('fast', None): wsgi = pywsgi
        self.options['log'] = None if self.quiet else 'default'
        address = (self.host, self.port)
        server = wsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()
bottle.py 文件源码 项目:bigbottle 作者: opendiploma 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _cli_patch(args):
    opts, _, _ = _cli_parse(args)
    if opts.server:
        if opts.server.startswith('gevent'):
            import gevent.monkey
            gevent.monkey.patch_all()
        elif opts.server.startswith('eventlet'):
            import eventlet
            eventlet.monkey_patch()
bottle.py 文件源码 项目:bigbottle 作者: opendiploma 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, handler):
        from gevent import wsgi, pywsgi, local
        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if not self.options.pop('fast', None): wsgi = pywsgi
        self.options['log'] = None if self.quiet else 'default'
        address = (self.host, self.port)
        server = wsgi.WSGIServer(address, handler, **self.options)
        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()


问题


面经


文章

微信
公众号

扫码关注公众号