def test_onion_datagram_proxy():
received_buffer = []
received_d = defer.Deferred()
def received(data):
received_buffer.append(data)
received_d.callback(None)
received_size = 10
proxy_factory = OnionDatagramProxyFactory(received)
protocol = proxy_factory.buildProtocol(123)
packet = b"A" * received_size
protocol.stringReceived(packet)
assert received_buffer[0] == packet
service_port = yield txtorcon.util.available_tcp_port(reactor)
service_endpoint_desc = "tcp:interface=127.0.0.1:%s" % service_port
service_endpoint = endpoints.serverFromString(reactor, service_endpoint_desc)
yield service_endpoint.listen(proxy_factory)
client_endpoint_desc = "tcp:127.0.0.1:%s" % service_port
client_endpoint = endpoints.clientFromString(reactor, client_endpoint_desc)
client_protocol = Int32StringReceiver()
yield endpoints.connectProtocol(client_endpoint, client_protocol)
client_protocol.sendString(packet)
print "BEFORE CLOSE"
client_protocol.transport.loseConnection()
yield received_d
assert received_buffer[0] == packet
评论列表
文章目录