def main():
ring = ringbuffer.RingBuffer(slot_bytes=50000, slot_count=10)
ring.new_writer()
processes = [
multiprocessing.Process(target=reader, args=(ring, ring.new_reader())),
multiprocessing.Process(target=reader, args=(ring, ring.new_reader())),
multiprocessing.Process(target=writer, args=(ring, 1, 1000)),
]
for p in processes:
p.daemon = True
p.start()
for p in processes:
p.join(timeout=20)
assert not p.is_alive()
assert p.exitcode == 0
评论列表
文章目录