def _run_items(mode, items, session, workers=None):
''' Multiprocess is not compatible with Windows !!! '''
if mode == "mproc":
'''Using ThreadPoolExecutor as managers to control the lifecycle of processes.
Each thread will spawn a process and terminates when the process joins.
'''
def run_task_in_proc(item, index):
proc = multiprocessing.Process(target=_run_next_item, args=(session, item, index))
proc.start()
proc.join()
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for index, item in enumerate(items):
executor.submit(run_task_in_proc, item, index)
elif mode == "mthread":
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for index, item in enumerate(items):
executor.submit(_run_next_item, session, item, index)
elif mode == "asyncnet":
import gevent
import gevent.monkey
import gevent.pool
gevent.monkey.patch_all()
pool = gevent.pool.Pool(size=workers)
for index, item in enumerate(items):
pool.spawn(_run_next_item, session, item, index)
pool.join()
else:
for i, item in enumerate(items):
nextitem = items[i + 1] if i + 1 < len(items) else None
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
python类monkey()的实例源码
def _cli_patch(args):
opts, _, _ = _cli_parse(args)
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def apply_patch(hogging_detection=False, real_threads=1):
_logger.info('applying gevent patch (%s real threads)', real_threads)
# real_threads is 1 by default so it will be possible to run watch_threads concurrently
if hogging_detection:
real_threads += 1
if real_threads:
_RealThreadsPool(real_threads)
_patch_module_locks()
import gevent
import gevent.monkey
for m in ["easypy.threadtree", "easypy.concurrency"]:
assert m not in sys.modules, "Must apply the gevent patch before importing %s" % m
gevent.monkey.patch_all(Event=True, sys=True)
_unpatch_logging_handlers_lock()
global HUB
HUB = gevent.get_hub()
global threading
import threading
for thread in threading.enumerate():
_set_thread_uuid(thread.ident)
_set_main_uuid() # the patched threading has a new ident for the main thread
# this will declutter the thread dumps from gevent/greenlet frames
from .threadtree import _BOOTSTRAPPERS
import gevent, gevent.threading, gevent.greenlet
_BOOTSTRAPPERS.update([gevent, gevent.threading, gevent.greenlet])
if hogging_detection:
import greenlet
greenlet.settrace(lambda *args: _greenlet_trace_func(*args))
defer_to_thread(detect_hogging, 'detect-hogging')
def _patch_module_locks():
# gevent will not patch existing locks (including ModuleLocks) when it's not single threaded
# our solution is to monkey patch the release method for ModuleLocks objects
# we assume that patching is done early enough so no other locks are present
import importlib
_old_release = importlib._bootstrap._ModuleLock.release
def _release(*args, **kw):
lock = args[0]
if lock.owner == main_thread_ident_before_patching:
lock.owner = threading.main_thread().ident
_old_release(*args, **kw)
importlib._bootstrap._ModuleLock.release = _release
def non_gevent_sleep(timeout):
try:
gevent.monkey.saved['time']['sleep'](timeout)
except KeyError:
time.sleep(timeout)
def graph(request):
return Graph(**request.param)
# non-loopers are only safe when using gevent.monkey.patch_all().
# FIXME: find out why... OR stop supporting component reactivation (i.e. non-loopers)
def _yield_to_others(sleep):
if any(
[gevent.monkey.is_module_patched(mod)
for mod in ["socket", "subprocess"]]):
gevent.wait(timeout=sleep)
else:
time.sleep(sleep)
def test_gevent_monkey(pyi_builder):
pyi_builder.test_source(
"""
from gevent.monkey import patch_all
patch_all()
""")
def _cli_patch(args):
opts, _, _ = _cli_parse(args)
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def _cli_patch(args):
opts, _, _ = _cli_parse(args)
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def _cli_patch(cli_args): # pragma: no coverage
parsed_args, _ = _cli_parse(cli_args)
opts = parsed_args
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def cmd_daemon(args):
"""Entry point for starting a TCP git server."""
import optparse
parser = optparse.OptionParser()
parser.add_option("-l", "--listen_address", dest="listen_address",
default="127.0.0.1",
help="Binding IP address.")
parser.add_option("-p", "--port", dest="port", type=int,
default=TCP_GIT_PORT,
help="Binding TCP port.")
parser.add_option("-c", "--swift_config", dest="swift_config",
default="",
help="Path to the configuration file for Swift backend.")
options, args = parser.parse_args(args)
try:
import gevent
import geventhttpclient
except ImportError:
print("gevent and geventhttpclient libraries are mandatory "
" for use the Swift backend.")
sys.exit(1)
import gevent.monkey
gevent.monkey.patch_socket()
from dulwich.contrib.swift import load_conf
from dulwich import log_utils
logger = log_utils.getLogger(__name__)
conf = load_conf(options.swift_config)
backend = SwiftSystemBackend(logger, conf)
log_utils.default_logging_config()
server = TCPGitServer(backend, options.listen_address,
port=options.port)
server.serve_forever()
def _cli_patch(args):
opts, _, _ = _cli_parse(args)
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def _cli_patch(cli_args): # pragma: no coverage
parsed_args, _ = _cli_parse(cli_args)
opts = parsed_args
if opts.server:
if opts.server.startswith('gevent'):
import gevent.monkey
gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):
import eventlet
eventlet.monkey_patch()
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()