python类Pool()的实例源码

test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_apply(self):
        papply = self.pool.apply
        self.assertEqual(papply(sqr, (5,)), sqr(5))
        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_map_chunksize(self):
        try:
            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
        except multiprocessing.TimeoutError:
            self.fail("pool.map_async with chunksize stalled on null list")
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_async(self):
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 57 收藏 0 点赞 0 评论 0
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_unpickleable_result(self):
        from multiprocessing.pool import MaybeEncodingError
        p = multiprocessing.Pool(2)

        # Make sure we don't lose pool processes because of encoding errors.
        for iteration in range(20):
            res = p.apply_async(unpickleable_result)
            self.assertRaises(MaybeEncodingError, res.get)

        p.close()
        p.join()
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_number_of_objects(self):
        EXPECTED_NUMBER = 1                # the pool object is still alive
        multiprocessing.active_children()  # discard dead process objs
        gc.collect()                       # do garbage collection
        refs = self.manager._number_of_objects()
        debug_info = self.manager._debug_info()
        if refs != EXPECTED_NUMBER:
            print self.manager._debug_info()
            print debug_info

        self.assertEqual(refs, EXPECTED_NUMBER)

#
# Test of creating a customized manager class
#


问题


面经


文章

微信
公众号

扫码关注公众号