def test_partial_send(self, *args):
c = self.make_connection()
# only write the first four bytes of the OptionsMessage
write_size = 4
c.socket.send.side_effect = None
c.socket.send.return_value = write_size
c.handle_write()
msg_size = 9 # v3+ frame header
expected_writes = int(math.ceil(float(msg_size) / write_size))
size_mod = msg_size % write_size
last_write_size = size_mod if size_mod else write_size
self.assertFalse(c.is_defunct)
self.assertEqual(expected_writes, c.socket.send.call_count)
self.assertEqual(last_write_size, len(c.socket.send.call_args[0][0]))
test_asyncorereactor.py 文件源码
python
阅读 23
收藏 0
点赞 0
评论 0
评论列表
文章目录