def setUp(self):
# Create a directory
self.directory = self.mktemp()
os.mkdir(self.directory)
# Start the server
p = portal.Portal(ftp.FTPRealm(self.directory))
p.registerChecker(checkers.AllowAnonymousAccess(),
credentials.IAnonymous)
self.factory = ftp.FTPFactory(portal=p)
self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
# Hook the server's buildProtocol to make the protocol instance
# accessible to tests.
buildProtocol = self.factory.buildProtocol
d1 = defer.Deferred()
def _rememberProtocolInstance(addr):
protocol = buildProtocol(addr)
self.serverProtocol = protocol.wrappedProtocol
d1.callback(None)
return protocol
self.factory.buildProtocol = _rememberProtocolInstance
# Connect a client to it
portNum = self.port.getHost().port
clientCreator = protocol.ClientCreator(reactor, ftp.FTPClientBasic)
d2 = clientCreator.connectTCP("127.0.0.1", portNum)
def gotClient(client):
self.client = client
d2.addCallback(gotClient)
return defer.gatherResults([d1, d2])
评论列表
文章目录