def test_event(self):
ctx = mp.get_context('spawn')
queue = ctx.Queue()
ready = ctx.Event()
done = ctx.Event()
p = ctx.Process(target=cuda_multiply_two, args=(queue, ready, done))
p.start()
ready.wait()
with torch.cuda.stream(torch.cuda.Stream()):
tensor = torch.cuda.FloatTensor([1, 1, 1, 1])
# Use a sleep kernel to test events. Without the event, the
# multiply happens before the add.
event = torch.cuda.Event(interprocess=True)
torch.cuda._sleep(20000000) # about 30 ms
tensor.add_(1)
event.record()
queue.put((event, tensor))
done.wait() # must wait until subprocess records event
event.synchronize()
self.assertEqual(list(tensor), [4, 4, 4, 4])
p.join()
test_multiprocessing.py 文件源码
python
阅读 39
收藏 0
点赞 0
评论 0
评论列表
文章目录