def test_use_after_fork(self):
c = Connection.establish(self.broker)
pid = os.fork()
if pid: # Parent
self.assertEqual((pid, 0), os.waitpid(pid, 0))
self.assertEqual("child", c.session().receiver("child;{create:always}").fetch().content)
else: # Child
try:
# Can establish new connections
s = Connection.establish(self.broker).session().sender("child;{create:always}")
self.assertRaises(SelectorStopped, c.session) # But can't use parent connection
s.send("child")
os._exit(0)
except Exception, e:
print >>sys.stderr, "test child process error: %s" % e
os.exit(1)
finally:
os._exit(1) # Hard exit from child to stop remaining tests running twice
评论列表
文章目录