def __init__(self, pool_names, max_restarts=0, options=None):
self.names = pool_names
self.queue = multiprocessing.Queue()
self.pool = dict()
self.max_restarts = max_restarts
self.options = options or dict()
self.dog_path = os.curdir
self.dog_handler = LiveReload(self)
# self.dog_observer = Observer()
# self.dog_observer.schedule(self.dog_handler, self.dog_path, recursive=True)
if multiprocessing.get_start_method() != 'fork': # pragma: no cover
root_logger = logging.getLogger()
self.log_listener = QueueListener(self.queue, *root_logger.handlers)
# TODO: Find out how to get the watchdog + livereload working on a later moment.
# self.dog_observer.start()
self._restarts = dict()
python类get_start_method()的实例源码
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
def test_noforkbomb(self):
sm = multiprocessing.get_start_method()
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
if sm != 'fork':
rc, out, err = test.script_helper.assert_python_failure(name, sm)
self.assertEqual(out, b'')
self.assertIn(b'RuntimeError', err)
else:
rc, out, err = test.script_helper.assert_python_ok(name, sm)
self.assertEqual(out.rstrip(), b'123')
self.assertEqual(err, b'')
#
# Issue #17555: ForkAwareThreadLock
#
def test_closefd(self):
if not HAS_REDUCTION:
raise unittest.SkipTest('requires fd pickling')
reader, writer = multiprocessing.Pipe()
fd = self.get_high_socket_fd()
try:
p = multiprocessing.Process(target=self._test_closefds,
args=(writer, fd))
p.start()
writer.close()
e = reader.recv()
p.join(timeout=5)
finally:
self.close(fd)
writer.close()
reader.close()
if multiprocessing.get_start_method() == 'fork':
self.assertIs(e, None)
else:
WSAENOTSOCK = 10038
self.assertIsInstance(e, OSError)
self.assertTrue(e.errno == errno.EBADF or
e.winerror == WSAENOTSOCK, e)
#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
def _check_context(cls, conn):
conn.send(multiprocessing.get_start_method())
def check_context(self, ctx):
r, w = ctx.Pipe(duplex=False)
p = ctx.Process(target=self._check_context, args=(w,))
p.start()
w.close()
child_method = r.recv()
r.close()
p.join()
self.assertEqual(child_method, ctx.get_start_method())
def test_context(self):
for method in ('fork', 'spawn', 'forkserver'):
try:
ctx = multiprocessing.get_context(method)
except ValueError:
continue
self.assertEqual(ctx.get_start_method(), method)
self.assertIs(ctx.get_context(), ctx)
self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
self.assertRaises(ValueError, ctx.set_start_method, None)
self.check_context(ctx)
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
def _run(name, queue, options):
"""
The actual process that runs the separate controller instance.
:param name: name of the process
:param queue: Queue of the binding parent.
:param options: Custom Options
:type name: str
"""
from pyplanet.core.instance import Controller
from pyplanet.utils.log import initiate_logger, QueueHandler
import logging
# Tokio Asyncio (EXPERIMENTAL).
if 'tokio' in options and options['tokio'] is True:
import tokio
import asyncio
policy = tokio.TokioLoopPolicy()
asyncio.set_event_loop_policy(policy)
asyncio.set_event_loop(tokio.new_event_loop())
logging.warning('Using experimental Tokio Asyncio Loop!')
# Logging to queue.
if multiprocessing.get_start_method() != 'fork': # pragma: no cover
initiate_logger()
root_logger = logging.getLogger()
formatter = ColoredFormatter(
'%(log_color)s%(levelname)-8s%(reset)s %(yellow)s[%(threadName)s][%(name)s]%(reset)s %(blue)s%(message)s'
)
queue_handler = QueueHandler(queue)
queue_handler.setFormatter(formatter)
root_logger.addHandler(queue_handler)
logging.getLogger(__name__).info('Starting pool process for \'{}\'...'.format(name))
# Setting thread name to our process name.
threading.main_thread().setName(name)
# Start instance.
instance = Controller.prepare(name).instance
instance._queue = queue
instance.start()
def _loop_wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, sigint, sigterm, name,
logging_level, conn_send, func_running, log_queue):
"""
to be executed as a separate process (that's why this functions is declared static)
"""
prefix = get_identifier(name) + ' '
global log
log = logging.getLogger(__name__+".log_{}".format(get_identifier(name, bold=False)))
log.setLevel(logging_level)
log.addHandler(QueueHandler(log_queue))
sys.stdout = StdoutPipe(conn_send)
log.debug("enter wrapper_func")
SIG_handler_Loop(sigint, sigterm, log, prefix)
func_running.value = True
error = False
while shared_mem_run.value:
try:
# in pause mode, simply sleep
if shared_mem_pause.value:
quit_loop = False
else:
# if not pause mode -> call func and see what happens
try:
quit_loop = func(*args)
except LoopInterruptError:
raise
except Exception as e:
log.error("error %s occurred in loop calling 'func(*args)'", type(e))
log.info("show traceback.print_exc()\n%s", traceback.format_exc())
error = True
break
if quit_loop is True:
log.debug("loop stooped because func returned True")
break
time.sleep(interval)
except LoopInterruptError:
log.debug("quit wrapper_func due to InterruptedError")
break
func_running.value = False
if error:
sys.exit(-1)
else:
log.debug("wrapper_func terminates gracefully")
# gets rid of the following warnings
# Exception ignored in: <_io.FileIO name='/dev/null' mode='rb'>
# ResourceWarning: unclosed file <_io.TextIOWrapper name='/dev/null' mode='r' encoding='UTF-8'>
try:
if mp.get_start_method() == "spawn":
sys.stdin.close()
except AttributeError:
pass