python类ClientFactory()的实例源码

msn.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _login(userHandle, passwd, nexusServer, cached=0, authData=''):
    """
    This function is used internally and should not ever be called
    directly.
    """
    cb = Deferred()
    def _cb(server, auth):
        loginFac = ClientFactory()
        loginFac.protocol = lambda : PassportLogin(cb, userHandle, passwd, server, auth)
        reactor.connectSSL(_parsePrimitiveHost(server)[0], 443, loginFac, ClientContextFactory())

    if cached:
        _cb(nexusServer, authData)
    else:
        fac = ClientFactory()
        d = Deferred()
        d.addCallbacks(_cb, callbackArgs=(authData,))
        d.addErrback(lambda f: cb.errback(f))
        fac.protocol = lambda : PassportNexus(d, nexusServer)
        reactor.connectSSL(_parsePrimitiveHost(nexusServer)[0], 443, fac, ClientContextFactory())
    return cb
test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        from twisted.internet import reactor
        self.serverFactory = protocol.ServerFactory()
        self.serverFactory.protocol = self.serverProto
        self.clientFactory = protocol.ClientFactory()
        self.clientFactory.protocol = self.clientProto
        self.clientFactory.onMade = defer.Deferred()
        self.serverFactory.onMade = defer.Deferred()
        self.serverPort = reactor.listenTCP(0, self.serverFactory)
        self.clientConn = reactor.connectTCP(
            '127.0.0.1', self.serverPort.getHost().port,
            self.clientFactory)
        def getProtos(rlst):
            self.cli = self.clientFactory.theProto
            self.svr = self.serverFactory.theProto
        dl = defer.DeferredList([self.clientFactory.onMade,
                                 self.serverFactory.onMade])
        return dl.addCallback(getProtos)
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testTCP(self):
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.setServiceParent(s)
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testVolatile(self):
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._port, None)
        t.stopService()
        self.assertIdentical(t._port, None)
        self.failIf(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._connection, None)
        t.stopService()
        self.assertIdentical(t._connection, None)
        self.failIf(t.running)
test_ssl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
test_sslverify.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def loopback(self, serverCertOpts, clientCertOpts,
                 onServerLost=None, onClientLost=None, onData=None):
        if onServerLost is None:
            self.onServerLost = onServerLost = defer.Deferred()
        if onClientLost is None:
            self.onClientLost = onClientLost = defer.Deferred()
        if onData is None:
            onData = defer.Deferred()

        serverFactory = protocol.ServerFactory()
        serverFactory.protocol = DataCallbackProtocol
        serverFactory.onLost = onServerLost
        serverFactory.onData = onData

        clientFactory = protocol.ClientFactory()
        clientFactory.protocol = WritingProtocol
        clientFactory.onLost = onClientLost

        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
        self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
                                             clientFactory, clientCertOpts)
p2p.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
vnc_proxy_server.py 文件源码 项目:universe 作者: openai 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connectionMade(self):
        logger.info('[%s] Connection received from VNC client', self.id)
        factory = protocol.ClientFactory()
        factory.protocol = VNCProxyClient
        factory.vnc_server = self
        factory.deferrable = defer.Deferred()
        endpoint = endpoints.clientFromString(reactor, self.factory.vnc_address)

        def _established_callback(client):
            if self._broken:
                client.close()
            self.vnc_client = client
            self.flush()
        def _established_errback(reason):
            logger.error('[VNCProxyServer] Connection succeeded but could not establish session: %s', reason)
            self.close()
        factory.deferrable.addCallbacks(_established_callback, _established_errback)

        def _connect_errback(reason):
            logger.error('[VNCProxyServer] Connection failed: %s', reason)
            self.close()
        endpoint.connect(factory).addErrback(_connect_errback)

        self.send_ProtocolVersion_Handshake()
p2p.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.VariableDict({}), mining_txs_var=variable.VariableDict({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
EchoClient.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main():
    f = EchoFactory()  # ???EchoFactory
    reactor.connectTCP("localhost", 8000, f)
    # twisted.internet.selectreactor.SelectReactor
    # ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase???
    # connectTCP(self, host, port, factory, timeout=30, bindAddress=None):??
    #
    #
    # ???????twisted.internet.tcp.Connector(),????????client?ClientFactory,
    #
    reactor.run()  # ?????????????
    # run?????????startRunning??,startRunning???ReactorBase??startRunning??
    # run?????????mainLoop??
    # mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select????
    # ???????,??self._doReadOrWrite??,?????????,????????client,????????????,
    # ??twisted.internet.tcp.BaseClient().doConnect,???self._connectDone(),?????self.protocol.makeConnection(self)
    # ?????????self.connectionMade(),??????????????,??????EchoClient().connectionMade()
msn.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _login(userHandle, passwd, nexusServer, cached=0, authData=''):
    """
    This function is used internally and should not ever be called
    directly.
    """
    cb = Deferred()
    def _cb(server, auth):
        loginFac = ClientFactory()
        loginFac.protocol = lambda : PassportLogin(cb, userHandle, passwd, server, auth)
        reactor.connectSSL(_parsePrimitiveHost(server)[0], 443, loginFac, ClientContextFactory())

    if cached:
        _cb(nexusServer, authData)
    else:
        fac = ClientFactory()
        d = Deferred()
        d.addCallbacks(_cb, callbackArgs=(authData,))
        d.addErrback(lambda f: cb.errback(f))
        fac.protocol = lambda : PassportNexus(d, nexusServer)
        reactor.connectSSL(_parsePrimitiveHost(nexusServer)[0], 443, fac, ClientContextFactory())
    return cb
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        from twisted.internet import reactor
        self.serverFactory = protocol.ServerFactory()
        self.serverFactory.protocol = self.serverProto
        self.clientFactory = protocol.ClientFactory()
        self.clientFactory.protocol = self.clientProto
        self.clientFactory.onMade = defer.Deferred()
        self.serverFactory.onMade = defer.Deferred()
        self.serverPort = reactor.listenTCP(0, self.serverFactory)
        self.clientConn = reactor.connectTCP(
            '127.0.0.1', self.serverPort.getHost().port,
            self.clientFactory)
        def getProtos(rlst):
            self.cli = self.clientFactory.theProto
            self.svr = self.serverFactory.theProto
        dl = defer.DeferredList([self.clientFactory.onMade,
                                 self.serverFactory.onMade])
        return dl.addCallback(getProtos)
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testTCP(self):
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.setServiceParent(s)
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, 'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testVolatile(self):
        if not interfaces.IReactorUNIX(reactor, None):
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._port, None)
        t.stopService()
        self.assertIdentical(t._port, None)
        self.failIf(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIdentical(t1._connection, None)
        t.stopService()
        self.assertIdentical(t._connection, None)
        self.failIf(t.running)
test_ssl.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
test_sslverify.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def loopback(self, serverCertOpts, clientCertOpts,
                 onServerLost=None, onClientLost=None, onData=None):
        if onServerLost is None:
            self.onServerLost = onServerLost = defer.Deferred()
        if onClientLost is None:
            self.onClientLost = onClientLost = defer.Deferred()
        if onData is None:
            onData = defer.Deferred()

        serverFactory = protocol.ServerFactory()
        serverFactory.protocol = DataCallbackProtocol
        serverFactory.onLost = onServerLost
        serverFactory.onData = onData

        clientFactory = protocol.ClientFactory()
        clientFactory.protocol = WritingProtocol
        clientFactory.onLost = onClientLost

        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
        self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
                                             clientFactory, clientCertOpts)
p2p.py 文件源码 项目:p2pool-ltc 作者: ilsawa 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip
        self.external_ip = external_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-bsty 作者: amarian12 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
p2p.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
        self.best_share_hash_func = best_share_hash_func
        self.port = port
        self.net = net
        self.addr_store = dict(addr_store)
        self.connect_addrs = connect_addrs
        self.preferred_storage = preferred_storage
        self.known_txs_var = known_txs_var
        self.mining_txs_var = mining_txs_var
        self.advertise_ip = advertise_ip

        self.traffic_happened = variable.Event()
        self.nonce = random.randrange(2**64)
        self.peers = {}
        self.bans = {} # address -> end_time
        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
        self.serverfactory = ServerFactory(self, max_incoming_conns)
        self.running = False
test_endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_failedBuildProtocol(self):
        """
        An exception raised in C{buildProtocol} of our wrappedFactory
        results in our C{onConnection} errback being fired.
        """
        class BogusFactory(ClientFactory):
            """
            A one off factory whose C{buildProtocol} raises an C{Exception}.
            """

            def buildProtocol(self, addr):
                raise ValueError("My protocol is poorly defined.")

        wf = endpoints._WrappingFactory(BogusFactory())

        wf.buildProtocol(None)

        d = self.assertFailure(wf._onConnection, ValueError)
        d.addCallback(lambda e: self.assertEqual(
            e.args,
            ("My protocol is poorly defined.",)))

        return d
test_tcp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_buildProtocolClient(self):
        """
        L{ClientFactory.buildProtocol} should be invoked with the address of
        the server to which a connection has been established, which should
        be the same as the address reported by the C{getHost} method of the
        transport of the server protocol and as the C{getPeer} method of the
        transport of the client protocol.
        """
        serverHost = self.server.protocol.transport.getHost()
        clientPeer = self.client.protocol.transport.getPeer()

        self.assertEqual(
            self.clientWrapper.addresses,
            [IPv4Address('TCP', serverHost.host, serverHost.port)])
        self.assertEqual(
            self.clientWrapper.addresses,
            [IPv4Address('TCP', clientPeer.host, clientPeer.port)])
test_stringtransport.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_connectDestination(self):
        """
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        """
        memoryReactor = MemoryReactor()
        for connector in [memoryReactor.connectTCP(
                              "test.example.com", 8321, ClientFactory()),
                          memoryReactor.connectSSL(
                              "test.example.com", 8321, ClientFactory(),
                              None)]:
            verifyObject(IConnector, connector)
            address = connector.getDestination()
            verifyObject(IAddress, address)
            self.assertEqual(address.host, "test.example.com")
            self.assertEqual(address.port, 8321)
        connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory())
        verifyObject(IConnector, connector)
        address = connector.getDestination()
        verifyObject(IAddress, address)
        self.assertEqual(address.name, b"/fake/path")
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testPrivileged(self):
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.TCPServer(0, factory)
        t.privileged = 1
        t.privilegedStartService()
        num = t._port.getHost().port
        factory = protocol.ClientFactory()
        factory.d = defer.Deferred()
        factory.protocol = Foo
        factory.line = None
        c = internet.TCPClient('127.0.0.1', num, factory)
        c.startService()
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : c.stopService())
        factory.d.addCallback(lambda x : t.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        return factory.d
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testUNIX(self):
        # FIXME: This test is far too dense.  It needs comments.
        #  -- spiv, 2004-11-07
        s = service.MultiService()
        s.startService()
        factory = protocol.ServerFactory()
        factory.protocol = TestEcho
        TestEcho.d = defer.Deferred()
        t = internet.UNIXServer('echo.skt', factory)
        t.setServiceParent(s)
        factory = protocol.ClientFactory()
        factory.protocol = Foo
        factory.d = defer.Deferred()
        factory.line = None
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
        factory.d.addCallback(self.assertEqual, b'lalala')
        factory.d.addCallback(lambda x : s.stopService())
        factory.d.addCallback(lambda x : TestEcho.d)
        factory.d.addCallback(self._cbTestUnix, factory, s)
        return factory.d
test_application.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testVolatile(self):
        factory = protocol.ServerFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXServer('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._port, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._port)
        t.stopService()
        self.assertIsNone(t._port)
        self.assertFalse(t.running)

        factory = protocol.ClientFactory()
        factory.protocol = wire.Echo
        t = internet.UNIXClient('echo.skt', factory)
        t.startService()
        self.failIfIdentical(t._connection, None)
        t1 = copy.copy(t)
        self.assertIsNone(t1._connection)
        t.stopService()
        self.assertIsNone(t._connection)
        self.assertFalse(t.running)


问题


面经


文章

微信
公众号

扫码关注公众号