def testTcpNoDelay(self):
f = MyServerFactory()
port = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = port.getHost().port
self.ports.append(port)
clientF = MyClientFactory()
reactor.connectTCP("127.0.0.1", self.n, clientF)
d = loopUntil(lambda: (f.called > 0 and
getattr(clientF, 'protocol', None) is not None))
def check(x):
for p in clientF.protocol, f.protocol:
transport = p.transport
self.assertEquals(transport.getTcpNoDelay(), 0)
transport.setTcpNoDelay(1)
self.assertEquals(transport.getTcpNoDelay(), 1)
transport.setTcpNoDelay(0)
self.assertEquals(transport.getTcpNoDelay(), 0)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(clientF.protocol.transport, port))
return d
python类listenTCP()的实例源码
def testTcpKeepAlive(self):
f = MyServerFactory()
port = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = port.getHost().port
self.ports.append(port)
clientF = MyClientFactory()
reactor.connectTCP("127.0.0.1", self.n, clientF)
d = loopUntil(lambda :(f.called > 0 and
getattr(clientF, 'protocol', None) is not None))
def check(x):
for p in clientF.protocol, f.protocol:
transport = p.transport
self.assertEquals(transport.getTcpKeepAlive(), 0)
transport.setTcpKeepAlive(1)
self.assertEquals(transport.getTcpKeepAlive(), 1)
transport.setTcpKeepAlive(0)
self.assertEquals(transport.getTcpKeepAlive(), 0)
d.addCallback(check)
d.addBoth(lambda _:self.cleanPorts(clientF.protocol.transport, port))
return d
def testClientStartStop(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = p.getHost().port
self.ports.append(p)
f.port = p
d = loopUntil(lambda :p.connected)
def check(ignored):
factory = ClientStartStopFactory()
reactor.connectTCP("127.0.0.1", self.n, factory)
self.assert_(factory.started)
return loopUntil(lambda :factory.stopped)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(*self.ports))
return d
def testUserFail(self):
f = MyServerFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
def startedConnecting(connector):
connector.stopConnecting()
factory = ClientStartStopFactory()
factory.startedConnecting = startedConnecting
reactor.connectTCP("127.0.0.1", n, factory)
d = loopUntil(lambda :factory.stopped)
def check(ignored):
self.assertEquals(factory.failed, 1)
factory.reason.trap(error.UserError)
return self.cleanPorts(*self.ports)
return d.addCallback(check)
def testCannotBind(self):
f = MyServerFactory()
p1 = reactor.listenTCP(0, f, interface='127.0.0.1')
n = p1.getHost().port
self.ports.append(p1)
dest = p1.getHost()
self.assertEquals(dest.type, "TCP")
self.assertEquals(dest.host, "127.0.0.1")
self.assertEquals(dest.port, n)
# make sure new listen raises error
self.assertRaises(error.CannotListenError,
reactor.listenTCP, n, f, interface='127.0.0.1')
return self.cleanPorts(*self.ports)
def testHostAddress(self):
f1 = MyServerFactory()
p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
n = p1.getHost().port
self.ports.append(p1)
f2 = MyOtherClientFactory()
p2 = reactor.connectTCP('127.0.0.1', n, f2)
d = loopUntil(lambda :p2.state == "connected")
def check(ignored):
self.assertEquals(p1.getHost(), f2.address)
self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
return p1.stopListening()
def cleanup(ignored):
self.ports.append(p2.transport)
return self.cleanPorts(*self.ports)
return d.addCallback(check).addCallback(cleanup)
def testWriter(self):
f = protocol.Factory()
f.protocol = WriterProtocol
f.done = 0
f.problem = 0
wrappedF = WiredFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = WriterClientFactory()
wrappedClientF = WiredFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(f.problem == 0, "writer indicated an error")
self.failUnless(clientF.done,
"client didn't see connection dropped")
expected = "".join(["Hello Cleveland!\n",
"Goodbye", " cruel", " world", "\n"])
self.failUnless(clientF.data == expected,
"client didn't receive all the data it expected")
d = defer.gatherResults([wrappedF.onDisconnect,
wrappedClientF.onDisconnect])
return d.addCallback(check)
def testWriter(self):
f = protocol.Factory()
f.protocol = LargeBufferWriterProtocol
f.done = 0
f.problem = 0
f.len = self.datalen
wrappedF = FireOnCloseFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = LargeBufferReaderClientFactory()
wrappedClientF = FireOnCloseFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred])
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(clientF.len == self.datalen,
"client didn't receive all the data it expected "
"(%d != %d)" % (clientF.len, self.datalen))
self.failUnless(clientF.done,
"client didn't see connection dropped")
return d.addCallback(check)
def setUp(self):
from twisted.internet import reactor
self.serverFactory = protocol.ServerFactory()
self.serverFactory.protocol = self.serverProto
self.clientFactory = protocol.ClientFactory()
self.clientFactory.protocol = self.clientProto
self.clientFactory.onMade = defer.Deferred()
self.serverFactory.onMade = defer.Deferred()
self.serverPort = reactor.listenTCP(0, self.serverFactory)
self.clientConn = reactor.connectTCP(
'127.0.0.1', self.serverPort.getHost().port,
self.clientFactory)
def getProtos(rlst):
self.cli = self.clientFactory.theProto
self.svr = self.serverFactory.theProto
dl = defer.DeferredList([self.clientFactory.onMade,
self.serverFactory.onMade])
return dl.addCallback(getProtos)
def testStopTrying(self):
f = Factory()
f.protocol = In
f.connections = 0
f.allMessages = []
f.goal = 2
f.d = defer.Deferred()
c = ReconnectingClientFactory()
c.initialDelay = c.delay = 0.2
c.protocol = Out
c.howManyTimes = 2
port = self.port = reactor.listenTCP(0, f)
PORT = port.getHost().port
reactor.connectTCP('127.0.0.1', PORT, c)
f.d.addCallback(self._testStopTrying_1, f, c)
return f.d
def ftp_PASV(self):
"""Request for a passive connection
from the rfc::
This command requests the server-DTP to \"listen\" on a data port
(which is not its default data port) and to wait for a connection
rather than initiate one upon receipt of a transfer command. The
response to this command includes the host and port address this
server is listening on.
"""
# if we have a DTP port set up, lose it.
if self.dtpFactory is not None:
# cleanupDTP sets dtpFactory to none. Later we'll do
# cleanup here or something.
self.cleanupDTP()
self.dtpFactory = DTPFactory(pi=self)
self.dtpFactory.setTimeout(self.dtpTimeout)
self.dtpPort = reactor.listenTCP(0, self.dtpFactory)
host = self.transport.getHost().host
port = self.dtpPort.getHost().port
self.reply(ENTERING_PASV_MODE, encodeHostPort(host, port))
return self.dtpFactory.deferred.addCallback(lambda ign: None)
def setUp(self):
self.factory = server.DNSServerFactory([
test_domain_com, reverse_domain, my_domain_com
], verbose=2)
p = dns.DNSDatagramProtocol(self.factory)
while 1:
self.listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
port = self.listenerTCP.getHost().port
try:
self.listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
except error.CannotListenError:
self.listenerTCP.stopListening()
else:
break
self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def _doStart(self, tc, port):
"""Starts the factory"""
self.factory = ChatFactory(tc, self._doOnConnectionMade, self._doOnLine)
self.port = reactor.listenTCP(int(port), self.factory)
def listen_ip(factory, config, ip_local_default=True, ip_version_default=4):
'''
Internal implementation that opens IP listeners for factory based on
config. See listen() for details.
'''
# upgrade things to lists if they have been passed as single items
config = _listify(config)
ip_version_default = _listify(ip_version_default)
# now process the list
listeners = []
for item in config:
if not validate_connection_config(item):
# warn but skip invalid configs
continue
if 'port' in item:
port = int(item['port'])
if 'ip' in item:
ip = item['ip']
listeners.append(reactor.listenTCP(port, factory,
interface=ip))
else:
for ip_version in ip_version_default:
ip = IP_LISTEN_DEFAULT[ip_version][ip_local_default]
listeners.append(reactor.listenTCP(port, factory,
interface=ip))
return _unlistify(listeners)
def starttcpproxy():
reactor.listenTCP(localport, portforward.ProxyFactory(desthost, destport))
reactor.run()
def add_factory(port_map_desc):
match = port_map_pattern.match(port_map_desc)
if not match: die_usage("malformed port map description: %s" % port_map_desc)
listen_port = int(match.group(1))
remote_host = match.group(2)
remote_port = int(match.group(3))
factory = HexdumpServerFactory(listen_port, remote_host, remote_port)
reactor.listenTCP(factory.listen_port, factory)
def setUp(self):
BaseTestCase.setUp(self)
self.gw = twisted.TwistedGateway(expose_request=False)
root = resource.Resource()
root.putChild('', self.gw)
self.p = reactor.listenTCP(0, server.Site(root), interface="127.0.0.1")
self.port = self.p.getHost().port