python类ServerFactory()的实例源码

test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        from twisted.internet import reactor
        self.serverFactory = protocol.ServerFactory()
        self.serverFactory.protocol = self.serverProto
        self.clientFactory = protocol.ClientFactory()
        self.clientFactory.protocol = self.clientProto
        self.clientFactory.onMade = defer.Deferred()
        self.serverFactory.onMade = defer.Deferred()
        self.serverPort = reactor.listenTCP(0, self.serverFactory)
        self.clientConn = reactor.connectTCP(
            '127.0.0.1', self.serverPort.getHost().port,
            self.clientFactory)
        def getProtos(rlst):
            self.cli = self.clientFactory.theProto
            self.svr = self.serverFactory.theProto
        dl = defer.DeferredList([self.clientFactory.onMade,
                                 self.serverFactory.onMade])
        return dl.addCallback(getProtos)
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testVolatile(self):
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._port, None)
        t.stopService()
        self.assertIdentical(t._port, None)
        self.failIf(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._connection, None)
        t.stopService()
        self.assertIdentical(t._connection, None)
        self.failIf(t.running)
test_ssl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
test_ssl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testImmediateDisconnect(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        # Set up a server, connect to it with a client, which should work since our verifiers
        # allow anything, then disconnect.
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = protocol.Protocol
        self.serverPort = serverPort = reactor.listenSSL(0, 
            serverProtocolFactory, self.serverCtxFactory)

        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
        clientConnector = reactor.connectSSL('127.0.0.1', 
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        return clientProtocolFactory.connectionDisconnected.addCallback(
            lambda ignoredResult: self.serverPort.stopListening())
test_sslverify.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def loopback(self, serverCertOpts, clientCertOpts,
                 onServerLost=None, onClientLost=None, onData=None):
        if onServerLost is None:
            self.onServerLost = onServerLost = defer.Deferred()
        if onClientLost is None:
            self.onClientLost = onClientLost = defer.Deferred()
        if onData is None:
            onData = defer.Deferred()

        serverFactory = protocol.ServerFactory()
        serverFactory.protocol = DataCallbackProtocol
        serverFactory.onLost = onServerLost
        serverFactory.onData = onData

        clientFactory = protocol.ClientFactory()
        clientFactory.protocol = WritingProtocol
        clientFactory.onLost = onClientLost

        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
        self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
                                             clientFactory, clientCertOpts)
p2p.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.VariableDict({}), mining_txs_var=variable.VariableDict({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
EchoServer.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():  # ???????
    factory = protocol.ServerFactory()  # ???ServerFactory?,ServerFactory???factory
    factory.protocol = EchoServer  # ??factory??protocol??,?EchoServer??????protocol
    reactor.listenTCP(8000, factory, interface="127.0.0.1")
    # print(type(reactor))  # ??type???reactor???
    # twisted.internet.selectreactor.SelectReactor
    # ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase????
    # listenTCP??(port, factory, backlog=50, interface=''),backlog????listen???50
    # listenTCP???twisted.internet.tcp.Port?
    # PosixReactorBase??????twisted.internet.base._SignalReactorMixin,?????????run??
    reactor.run()
    # run?????????startRunning??,startRunning???ReactorBase??startRunning??
    # run?????????mainLoop??
    # mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select????
    # ???????,??self._doReadOrWrite??,??????????twisted.internet.tcp.Connection?doRead??,?????
    # ??self._dataReceived(data),??????self.protocol.dataReceived(data),??self.protocol????
    # ?????protocol.ServerFactory().protocol,????dataReceived(data),????????????,?????listenTCP???factory
    # ??factory.protocol.dataReceived(data) ????EchoServer().dataReceived(data)??
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        from twisted.internet import reactor
        self.serverFactory = protocol.ServerFactory()
        self.serverFactory.protocol = self.serverProto
        self.clientFactory = protocol.ClientFactory()
        self.clientFactory.protocol = self.clientProto
        self.clientFactory.onMade = defer.Deferred()
        self.serverFactory.onMade = defer.Deferred()
        self.serverPort = reactor.listenTCP(0, self.serverFactory)
        self.clientConn = reactor.connectTCP(
            '127.0.0.1', self.serverPort.getHost().port,
            self.clientFactory)
        def getProtos(rlst):
            self.cli = self.clientFactory.theProto
            self.svr = self.serverFactory.theProto
        dl = defer.DeferredList([self.clientFactory.onMade,
                                 self.serverFactory.onMade])
        return dl.addCallback(getProtos)
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testVolatile(self):
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._port, None)
        t.stopService()
        self.assertIdentical(t._port, None)
        self.failIf(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._connection, None)
        t.stopService()
        self.assertIdentical(t._connection, None)
        self.failIf(t.running)
test_ssl.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
test_ssl.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testImmediateDisconnect(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        # Set up a server, connect to it with a client, which should work since our verifiers
        # allow anything, then disconnect.
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = protocol.Protocol
        self.serverPort = serverPort = reactor.listenSSL(0, 
            serverProtocolFactory, self.serverCtxFactory)

        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
        clientConnector = reactor.connectSSL('127.0.0.1', 
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        return clientProtocolFactory.connectionDisconnected.addCallback(
            lambda ignoredResult: self.serverPort.stopListening())
test_sslverify.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def loopback(self, serverCertOpts, clientCertOpts,
                 onServerLost=None, onClientLost=None, onData=None):
        if onServerLost is None:
            self.onServerLost = onServerLost = defer.Deferred()
        if onClientLost is None:
            self.onClientLost = onClientLost = defer.Deferred()
        if onData is None:
            onData = defer.Deferred()

        serverFactory = protocol.ServerFactory()
        serverFactory.protocol = DataCallbackProtocol
        serverFactory.onLost = onServerLost
        serverFactory.onData = onData

        clientFactory = protocol.ClientFactory()
        clientFactory.protocol = WritingProtocol
        clientFactory.onLost = onClientLost

        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
        self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
                                             clientFactory, clientCertOpts)
p2p.py 文件源码 项目:p2pool-ltc 作者: ilsawa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-bsty 作者: amarian12 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
main.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, port=RESOURCE_MANAGER_PORT,
                 parser=DEFAULT_PARSER, log_to_screen=True):
        """Initialize the resource manager server.

        Args:
            port (number): client listener port.
            parser (object): messages parser of type `AbstractParser`.
            log_to_screen (bool): Enable log prints to screen.
        """
        self.logger = get_logger(log_to_screen)

        self._factory = ServerFactory()
        self._factory.protocol = Worker
        self._factory.logger = self.logger
        self._factory.protocol.parser = parser()

        self._port = port
        self._reactor = SelectReactor()
        self._reactor.listenTCP(port, self._factory)

        self._resource_manager = ManagerThread(self._reactor, self.logger)
        self._factory.request_queue = self._resource_manager.request_queue
test_socket.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_invalidDescriptor(self):
        """
        An implementation of L{IReactorSocket.adoptStreamPort} raises
        L{socket.error} if passed an integer which is not associated with a
        socket.
        """
        reactor = self.buildReactor()

        probe = socket.socket()
        fileno = probe.fileno()
        probe.close()

        exc = self.assertRaises(
            socket.error,
            reactor.adoptStreamPort, fileno, socket.AF_INET, ServerFactory())
        if platform.isWindows() and _PY3:
            self.assertEqual(exc.args[0], errno.WSAENOTSOCK)
        else:
            self.assertEqual(exc.args[0], errno.EBADF)
test_socket.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_invalidAddressFamily(self):
        """
        An implementation of L{IReactorSocket.adoptStreamPort} raises
        L{UnsupportedAddressFamily} if passed an address family it does not
        support.
        """
        reactor = self.buildReactor()

        port = socket.socket()
        port.bind(("127.0.0.1", 0))
        port.listen(1)
        self.addCleanup(port.close)

        arbitrary = 2 ** 16 + 7

        self.assertRaises(
            UnsupportedAddressFamily,
            reactor.adoptStreamPort, port.fileno(), arbitrary, ServerFactory())
test_socket.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_invalidAddressFamily(self):
        """
        An implementation of L{IReactorSocket.adoptStreamConnection} raises
        L{UnsupportedAddressFamily} if passed an address family it does not
        support.
        """
        reactor = self.buildReactor()

        connection = socket.socket()
        self.addCleanup(connection.close)

        arbitrary = 2 ** 16 + 7

        self.assertRaises(
            UnsupportedAddressFamily,
            reactor.adoptStreamConnection, connection.fileno(), arbitrary,
            ServerFactory())
test_ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _makeDataConnection(self, ignored=None):
        # Establish an active data connection (i.e. server connecting to
        # client).
        deferred = defer.Deferred()
        class DataFactory(protocol.ServerFactory):
            protocol = _BufferingProtocol
            def buildProtocol(self, addr):
                p = protocol.ServerFactory.buildProtocol(self, addr)
                reactor.callLater(0, deferred.callback, p)
                return p
        dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1')
        self.dataPorts.append(dataPort)
        cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1',
                                           dataPort.getHost().port)
        self.client.queueStringCommand(cmd)
        return deferred
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testTCP(self):
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.setServiceParent(s)
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testVolatile(self):
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._port)
        t.stopService()
        self.assertIsNone(t._port)
        self.assertFalse(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._connection)
        t.stopService()
        self.assertIsNone(t._connection)
        self.assertFalse(t.running)


问题


面经


文章

微信
公众号

扫码关注公众号