def test_mainline(self, m_select):
m_select.side_effect = iter([
([self.sck], [], []),
([self.sck], [], []),
])
exp_msg = {MSG_KEY_TYPE: MSG_TYPE_STATUS,
MSG_KEY_STATUS: STATUS_RESYNC}
self.sck.recv.return_value = msgpack.dumps(exp_msg)
for _ in xrange(2):
msg_gen = self.reader.new_messages(timeout=1)
msg_type, msg = next(msg_gen)
self.assertEqual(msg_type, MSG_TYPE_STATUS)
self.assertEqual(msg, exp_msg)
self.assertEqual(
self.sck.recv.mock_calls,
[
call(16384),
call(16384),
]
)
评论列表
文章目录