def test_handle_connection_timeout(self):
self.layer.socks_conn = Mock()
self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)
socks_request = Request(
REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
u"1.2.3.4", self.port)
self.layer.create_dest_stream = Mock(
side_effect=self.create_raise_exception_function(TimeoutError))
addr_future = self.layer.handle_request_and_create_destination(
socks_request)
with self.assertRaises(DestNotConnectedError):
yield addr_future
self.assertIsNotNone(self.event)
self.assertIsInstance(self.event, Response)
self.assertEqual(self.event.status, RESP_STATUS["NETWORK_UNREACHABLE"])
self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
self.assertEqual(self.event.addr, IPv4Address(u"1.2.3.4"))
self.assertEqual(self.event.port, self.port)
评论列表
文章目录