def _runTests(moduleName, fileName, debugMode = False):
if sys.version_info[:2] >= (3,4):
ctx = multiprocessing.get_context("spawn")
else:
ctx = multiprocessing
signalQueue = ctx.Queue()
resultQueue = ctx.Queue()
tester = _Tester(moduleName, fileName, debugMode, signalQueue, resultQueue)
p = ctx.Process(target=tester.run, name="Tester")
p.start()
start = time.time()
isTiming = False
while p.is_alive():
while not signalQueue.empty():
signal = signalQueue.get()
isTiming = signal.isTiming
description = signal.description
timeout = signal.timeout
if signal.resetTimer:
start = time.time()
if isTiming and time.time() - start > timeout:
result = TesterResult()
result.addOutput(printer.displayError("Timeout ({} seconds) reached during: {}".format(timeout, description)))
p.terminate()
p.join()
return result
if not resultQueue.empty():
p.terminate()
p.join()
break
time.sleep(0.1)
if not resultQueue.empty():
return resultQueue.get()
评论列表
文章目录