def test_socks_request_ipv4(self):
self.layer.socks_conn = Mock()
self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)
addr_future = self.layer.handle_request_and_create_destination(
Request(REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
u"127.0.0.1", self.port))
dest_stream, host, port = yield addr_future
self.assertIsNotNone(self.event)
self.assertIsInstance(self.event, Response)
self.assertEqual(self.event.status, RESP_STATUS["SUCCESS"])
self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
self.assertIsNotNone(dest_stream)
self.assertFalse(dest_stream.closed())
self.assertEqual(host, IPv4Address(u"127.0.0.1"))
self.assertEqual(port, self.port)
dest_stream.close()
评论列表
文章目录