def testchannel(self):
i, a, c = self.setup(True)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def icond(hook, msg):
if hook == 'admin' and msg['MsgType'] == 'Logon':
sock.connect(('127.0.0.1', CHANNEL_PORT))
sock.send(json.dumps({'MsgType': 'NewOrderSingle'}))
def acond(hook, msg):
if hook == 'app' and msg['MsgType'] == 'NewOrderSingle':
a.close()
i.close()
c.close()
self.loop(i, a, icond, acond, False)
self.assertEqual(json.loads(sock.recv(8192)), {'result': 'done'})
评论列表
文章目录