def cleanup():
global __multiprocessing, __manager, __closing
if __multiprocessing and __closing:
log.debug("Shutting down process handler")
try:
__closing.set(True)
except (IOError, EOFError):
log.debug("Connection to manager lost during cleanup")
# Only managers that were started via ".start()" implement a "shutdown".
# Managers started via ".connect" may skip this.
if hasattr(__manager, "shutdown"):
# wait for the spawner and the worker threads to go down
time.sleep(2.5)
# __manager.shutdown()
time.sleep(0.1)
# check if it is still alive and kill it if necessary
if __manager._process.is_alive():
__manager._process.terminate()
__manager = None
__closing = None
__multiprocessing = None
python类managers()的实例源码
def make_server_manager(port, authorization_key):
""" Create a manager for the server, listening on the given port.
Return a manager object with get_job_q and get_result_q
methods.
Arguments:
port -- Port to use for communication
authorization_key -- program secret used to identify correct server
Returns:
Manager to process jobs
"""
job_queue = Queue()
result_queue = Queue()
class JobQueueManager(multiprocessing.managers.SyncManager):
pass
JobQueueManager.register(JOB_QUEUE_NAME, callable=lambda: job_queue)
JobQueueManager.register(RES_QUEUE_NAME, callable=lambda: result_queue)
manager = JobQueueManager(address=('', port), authkey=authorization_key)
return manager
def test_import(self):
modules = [
'multiprocessing', 'multiprocessing.connection',
'multiprocessing.heap', 'multiprocessing.managers',
'multiprocessing.pool', 'multiprocessing.process',
'multiprocessing.synchronize', 'multiprocessing.util'
]
if HAS_REDUCTION:
modules.append('multiprocessing.reduction')
if c_int is not None:
# This module requires _ctypes
modules.append('multiprocessing.sharedctypes')
for name in modules:
__import__(name)
mod = sys.modules[name]
for attr in getattr(mod, '__all__', ()):
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
)
#
# Quick test that logging works -- does not test logging output
#
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.start(initializer, (self.ns,))
self.assertEqual(self.ns.test, 1)
m.shutdown()
def test_import(self):
modules = [
'multiprocessing', 'multiprocessing.connection',
'multiprocessing.heap', 'multiprocessing.managers',
'multiprocessing.pool', 'multiprocessing.process',
'multiprocessing.synchronize', 'multiprocessing.util'
]
if HAS_REDUCTION:
modules.append('multiprocessing.reduction')
if c_int is not None:
# This module requires _ctypes
modules.append('multiprocessing.sharedctypes')
for name in modules:
__import__(name)
mod = sys.modules[name]
for attr in getattr(mod, '__all__', ()):
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
)
#
# Quick test that logging works -- does not test logging output
#
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.start(initializer, (self.ns,))
self.assertEqual(self.ns.test, 1)
m.shutdown()
def process_manager(self):
if self._process_manager is None:
self._process_manager = multiprocessing.managers.SyncManager()
self._process_manager.start(_manager_initializer)
return self._process_manager
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.start(initializer, (self.ns,))
self.assertEqual(self.ns.test, 1)
m.shutdown()
m.join()