def test_socketsLeftOpen(self):
f = protocol.Factory()
f.protocol = protocol.Protocol
reactor.listenTCP(0, f)
python类Factory()的实例源码
def loopbackTCP(server, client, port=0, noisy=True):
"""Run session between server and client protocol instances over TCP."""
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def main():
if len(sys.argv) < 2:
printMessage("POP3 with no messages")
else:
args = sys.argv[1:]
for arg in args:
processArg(arg)
f = Factory()
f.protocol = POP3TestServer
reactor.listenTCP(PORT, f)
reactor.run()
def autostart(reason, **kwargs):
if reason == 0:
from twisted.internet import reactor
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def setUp(self):
factory = protocol.Factory()
factory.protocol = iscp.ISCP
self.proto = factory.buildProtocol('/dev/ttyUSB0')
self.tr = proto_helpers.StringTransport()
self.proto.makeConnection(self.tr)
self.tr.clear()
def __init__(self, eiscp_port=60128):
"""
Args:
eiscp_port (int): UDP port to listen for discovery requests on.
"""
protocol.Factory.__init__(self)
self.eiscp_port = eiscp_port
def autostart(reason, **kwargs):
if reason == 0:
from twisted.internet import reactor
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def testSocketLocking(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, wantPID=True)
self.assertRaises(
error.CannotListenError,
reactor.listenUNIX, filename, f, wantPID=True)
def stoppedListening(ign):
l = reactor.listenUNIX(filename, f, wantPID=True)
return l.stopListening()
return l.stopListening().addCallback(stoppedListening)
def testUncleanServerSocketLocking(self):
def ranStupidChild(ign):
# If this next call succeeds, our lock handling is correct.
p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
return p.stopListening()
return self._uncleanSocketTest(ranStupidChild)
def testRepr(self):
filename = self.mktemp()
f = Factory(self, filename)
p = reactor.listenUNIX(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def test_socketsLeftOpen(self):
f = protocol.Factory()
f.protocol = protocol.Protocol
reactor.listenTCP(0, f)
def loopbackTCP(server, client, port=0, noisy=True):
"""Run session between server and client protocol instances over TCP."""
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def main():
if len(sys.argv) < 2:
printMessage("POP3 with no messages")
else:
args = sys.argv[1:]
for arg in args:
processArg(arg)
f = Factory()
f.protocol = POP3TestServer
reactor.listenTCP(PORT, f)
reactor.run()
def main():
logging.basicConfig(filename='rsa.log', level=logging.DEBUG)
# Create the server
f = Factory()
f.protocol = Notary
reactor.listenTCP(1977, f)
reactor.run()
def autostart(reason, **kwargs):
if reason == 0:
print "[Hotplug] starting hotplug handler"
from twisted.internet import reactor
import os
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def startedConnecting(self, connector): # test
logger.info('Factory - Connecting')