def main():
process_pool_context = multiprocessing.get_context('spawn')
pool = multiprocessing.pool.Pool(
processes=4,
context=process_pool_context,
)
pool.apply_async(
func=zmq_streamer,
)
multiprocessing_manager = multiprocessing.Manager()
multiprocessing_queue = multiprocessing_manager.Queue(
maxsize=test_queue_size,
)
for i in range(test_queue_size):
multiprocessing_queue.put(b'1')
res = pool.apply_async(
func=consume_queue,
args=(multiprocessing_queue,),
)
res.get()
context = zmq.Context()
socket = context.socket(zmq.PAIR)
res = pool.apply_async(
func=consume_zmq_pair,
)
time.sleep(1)
socket.connect("tcp://localhost:%s" % zmq_port)
for i in range(test_queue_size):
socket.send(b'1')
res.get()
socket.close()
context = zmq.Context()
socket = context.socket(zmq.PUSH)
res = pool.apply_async(
func=consume_zmq_streamer,
)
time.sleep(1)
socket.connect("tcp://localhost:%s" % zmq_queue_port_pull)
for i in range(test_queue_size):
socket.send(b'1')
res.wait()
socket.close()
评论列表
文章目录