python类sharedctypes()的实例源码

test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_import(self):
        modules = [
            'multiprocessing', 'multiprocessing.connection',
            'multiprocessing.heap', 'multiprocessing.managers',
            'multiprocessing.pool', 'multiprocessing.process',
            'multiprocessing.synchronize', 'multiprocessing.util'
            ]

        if HAS_REDUCTION:
            modules.append('multiprocessing.reduction')

        if c_int is not None:
            # This module requires _ctypes
            modules.append('multiprocessing.sharedctypes')

        for name in modules:
            __import__(name)
            mod = sys.modules[name]

            for attr in getattr(mod, '__all__', ()):
                self.assertTrue(
                    hasattr(mod, attr),
                    '%r does not have attribute %r' % (mod, attr)
                    )

#
# Quick test that logging works -- does not test logging output
#
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_import(self):
        modules = [
            'multiprocessing', 'multiprocessing.connection',
            'multiprocessing.heap', 'multiprocessing.managers',
            'multiprocessing.pool', 'multiprocessing.process',
            'multiprocessing.synchronize', 'multiprocessing.util'
            ]

        if HAS_REDUCTION:
            modules.append('multiprocessing.reduction')

        if c_int is not None:
            # This module requires _ctypes
            modules.append('multiprocessing.sharedctypes')

        for name in modules:
            __import__(name)
            mod = sys.modules[name]

            for attr in getattr(mod, '__all__', ()):
                self.assertTrue(
                    hasattr(mod, attr),
                    '%r does not have attribute %r' % (mod, attr)
                    )

#
# Quick test that logging works -- does not test logging output
#
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        if not HAS_SHAREDCTYPES:
            self.skipTest("requires multiprocessing.sharedctypes")
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_import(self):
        modules = self.get_module_names()
        if sys.platform == 'win32':
            modules.remove('multiprocessing.popen_fork')
            modules.remove('multiprocessing.popen_forkserver')
            modules.remove('multiprocessing.popen_spawn_posix')
        else:
            modules.remove('multiprocessing.popen_spawn_win32')
            if not HAS_REDUCTION:
                modules.remove('multiprocessing.popen_forkserver')

        if c_int is None:
            # This module requires _ctypes
            modules.remove('multiprocessing.sharedctypes')

        for name in modules:
            __import__(name)
            mod = sys.modules[name]
            self.assertTrue(hasattr(mod, '__all__'), name)

            for attr in mod.__all__:
                self.assertTrue(
                    hasattr(mod, attr),
                    '%r does not have attribute %r' % (mod, attr)
                    )

#
# Quick test that logging works -- does not test logging output
#
multitables.py 文件源码 项目:multitables 作者: ghcollin 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Create an ordered barrier. When processes wait on this barrier, they are let through one at a time based
        on the provided index. The first process to be let through should provide an index of zero. Each subsequent
        process to be let through should provide an index equal to the current value of the internal counter.
        """
        import multiprocessing.sharedctypes
        self.cvar = multiprocessing.Condition()
        self.sval = multiprocessing.sharedctypes.RawValue('L')
        self.sval.value = 0
multitables.py 文件源码 项目:multitables 作者: ghcollin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, size):
        # The size of the queue is increased by one to give space for a QueueClosed signal.
        size += 1
        import multiprocessing.sharedctypes
        # The condition variable is used to both lock access to the internal resources and signal new items are ready.
        self.cvar = multiprocessing.Condition()
        # A shared array is used to store items in the queue
        sary = multiprocessing.sharedctypes.RawArray('b', 8*size)
        self.vals = np.frombuffer(sary, dtype=np.int64, count=size)
        self.vals[:] = -1
        # tail is the next item to be read from the queue
        self.tail = multiprocessing.sharedctypes.RawValue('l', 0)
        # size is the current number of items in the queue. head = tail + size
        self.size = multiprocessing.sharedctypes.RawValue('l', 0)
multitables.py 文件源码 项目:multitables 作者: ghcollin 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, queue_size, ary_template):
        """
        Create the circular buffer. An array template must be passed to determine the size of the buffer elements.

        :param queue_size: Number of arrays to use as buffer elements.
        :param ary_template: Buffer elements match this array in shape and data-type.
        """
        import multiprocessing.sharedctypes
        # The buffer uses two queues to synchonise access to the buffer.
        # Element indices are put and fetched from these queues.
        # Elements that are ready to be written to go into the write_queue.
        # Elements that are ready to be read go into the read_queue.
        # This is essentially a token passing process. Tokens are taken out of queues and are not put back until
        # operations are complete.
        self.read_queue = SafeQueue(queue_size)
        self.write_queue = SafeQueue(queue_size)

        elem_n_bytes = ary_template.nbytes
        elem_dtype = ary_template.dtype
        elem_size = ary_template.size
        elem_shape = ary_template.shape
        self.arys = []
        for i in range(queue_size):
            sarray = multiprocessing.sharedctypes.RawArray('b', elem_n_bytes)
            # Elements are numpy arrays that point into allocated shared memory.
            self.arys.append(np.frombuffer(sarray, dtype=elem_dtype, count=elem_size).reshape(elem_shape))
            # The queue of elements ready to be written to is initially populated with all elements.
            self.write_queue.put(i)


问题


面经


文章

微信
公众号

扫码关注公众号