def pool_in_process():
pool = multiprocessing.Pool(processes=4)
x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
python类Pool()的实例源码
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
self.assertEqual(pmap(sqr, range(100), chunksize=20),
map(sqr, range(100)))
def test_map_unplicklable(self):
# Issue #19425 -- failure to pickle should not cause a hang
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
class A(object):
def __reduce__(self):
raise RuntimeError('cannot pickle')
with self.assertRaises(RuntimeError):
self.pool.map(sqr, [A()]*10)
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
except multiprocessing.TimeoutError:
self.fail("pool.map_async with chunksize stalled on null list")
def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
def test_imap(self):
it = self.pool.imap(sqr, range(10))
self.assertEqual(list(it), map(sqr, range(10)))
it = self.pool.imap(sqr, range(10))
for i in range(10):
self.assertEqual(it.next(), i*i)
self.assertRaises(StopIteration, it.next)
it = self.pool.imap(sqr, range(1000), chunksize=100)
for i in range(1000):
self.assertEqual(it.next(), i*i)
self.assertRaises(StopIteration, it.next)
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), map(sqr, range(1000)))
it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
self.assertEqual(sorted(it), map(sqr, range(1000)))
def test_unpickleable_result(self):
from multiprocessing.pool import MaybeEncodingError
p = multiprocessing.Pool(2)
# Make sure we don't lose pool processes because of encoding errors.
for iteration in range(20):
res = p.apply_async(unpickleable_result)
self.assertRaises(MaybeEncodingError, res.get)
p.close()
p.join()
def test_number_of_objects(self):
EXPECTED_NUMBER = 1 # the pool object is still alive
multiprocessing.active_children() # discard dead process objs
gc.collect() # do garbage collection
refs = self.manager._number_of_objects()
debug_info = self.manager._debug_info()
if refs != EXPECTED_NUMBER:
print self.manager._debug_info()
print debug_info
self.assertEqual(refs, EXPECTED_NUMBER)
#
# Test of creating a customized manager class
#