def _run_items(mode, items, session, workers=None):
''' Multiprocess is not compatible with Windows !!! '''
if mode == "mproc":
'''Using ThreadPoolExecutor as managers to control the lifecycle of processes.
Each thread will spawn a process and terminates when the process joins.
'''
def run_task_in_proc(item, index):
proc = multiprocessing.Process(target=_run_next_item, args=(session, item, index))
proc.start()
proc.join()
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for index, item in enumerate(items):
executor.submit(run_task_in_proc, item, index)
elif mode == "mthread":
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for index, item in enumerate(items):
executor.submit(_run_next_item, session, item, index)
elif mode == "asyncnet":
import gevent
import gevent.monkey
import gevent.pool
gevent.monkey.patch_all()
pool = gevent.pool.Pool(size=workers)
for index, item in enumerate(items):
pool.spawn(_run_next_item, session, item, index)
pool.join()
else:
for i, item in enumerate(items):
nextitem = items[i + 1] if i + 1 < len(items) else None
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
评论列表
文章目录