def __init__(self, queue_size, ary_template):
"""
Create the circular buffer. An array template must be passed to determine the size of the buffer elements.
:param queue_size: Number of arrays to use as buffer elements.
:param ary_template: Buffer elements match this array in shape and data-type.
"""
import multiprocessing.sharedctypes
# The buffer uses two queues to synchonise access to the buffer.
# Element indices are put and fetched from these queues.
# Elements that are ready to be written to go into the write_queue.
# Elements that are ready to be read go into the read_queue.
# This is essentially a token passing process. Tokens are taken out of queues and are not put back until
# operations are complete.
self.read_queue = SafeQueue(queue_size)
self.write_queue = SafeQueue(queue_size)
elem_n_bytes = ary_template.nbytes
elem_dtype = ary_template.dtype
elem_size = ary_template.size
elem_shape = ary_template.shape
self.arys = []
for i in range(queue_size):
sarray = multiprocessing.sharedctypes.RawArray('b', elem_n_bytes)
# Elements are numpy arrays that point into allocated shared memory.
self.arys.append(np.frombuffer(sarray, dtype=elem_dtype, count=elem_size).reshape(elem_shape))
# The queue of elements ready to be written to is initially populated with all elements.
self.write_queue.put(i)
评论列表
文章目录