python类connectTCP()的实例源码

connection.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect_ip(factory, config, ip_local_default=True, ip_version_default=4):
    '''
    Internal implementation that opens IP connections for factory based on
    config. See connect() for details.
    '''
    # upgrade things to lists if they have been passed as single items
    config = _listify(config)
    ip_version_default = _listify(ip_version_default)
    # now process the list
    connectors = []
    for item in config:
        if not validate_connection_config(item,
                                       must_have_ip=(not ip_local_default)):
            # warn but skip invalid configs
            continue
        if 'port' in item:
            port = int(item['port'])
            if 'ip' in item:
                ip = item['ip']
                connectors.append(reactor.connectTCP(ip, port, factory))
            elif ip_local_default:
                for ip_version in ip_version_default:
                    ip = IP_CONNECT_DEFAULT[ip_version]
                    connectors.append(reactor.connectTCP(ip, port, factory))
    return _unlistify(connectors)
client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def downloadPage(url, file, contextFactory=None, *args, **kwargs):
    """Download a web page to a file.

    @param file: path to file on filesystem, or file-like object.

    See HTTPDownloader to see what extra args can be passed.
    """
    scheme, host, port, path = _parse(url)
    factory = HTTPDownloader(url, file, *args, **kwargs)
    if scheme == 'https':
        from twisted.internet import ssl
        if contextFactory is None:
            contextFactory = ssl.ClientContextFactory()
        reactor.connectSSL(host, port, factory, contextFactory)
    else:
        reactor.connectTCP(host, port, factory)
    return factory.deferred
distrib.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def render(self, request):
        """Render this request, from my server.

        This will always be asynchronous, and therefore return NOT_DONE_YET.
        It spins off a request to the pb client, and either adds it to the list
        of pending issues or requests it immediately, depending on if the
        client is already connected.
        """
        if not self.publisher:
            self.pending.append(request)
            if not self.waiting:
                self.waiting = 1
                bf = pb.PBClientFactory()
                timeout = 10
                if self.host == "unix":
                    reactor.connectUNIX(self.port, bf, timeout)
                else:
                    reactor.connectTCP(self.host, self.port, bf, timeout)
                d = bf.getRootObject()
                d.addCallbacks(self.connected, self.notConnected)

        else:
            i = Issue(request)
            self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
        return NOT_DONE_YET
proxy.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process(self):
        parsed = urlparse.urlparse(self.uri)
        protocol = parsed[0]
        host = parsed[1]
        port = self.ports[protocol]
        if ':' in host:
            host, port = host.split(':')
            port = int(port)
        rest = urlparse.urlunparse(('','')+parsed[2:])
        if not rest:
            rest = rest+'/'
        class_ = self.protocols[protocol]
        headers = self.getAllHeaders().copy()
        if not headers.has_key('host'):
            headers['host'] = host
        self.content.seek(0, 0)
        s = self.content.read()
        clientFactory = class_(self.method, rest, self.clientproto, headers,
                               s, self)
        reactor.connectTCP(host, port, clientFactory)
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testTcpNoDelay(self):
        f = MyServerFactory()
        port = reactor.listenTCP(0, f, interface="127.0.0.1")

        self.n = port.getHost().port
        self.ports.append(port)
        clientF = MyClientFactory()
        reactor.connectTCP("127.0.0.1", self.n, clientF)

        d = loopUntil(lambda: (f.called > 0 and
                               getattr(clientF, 'protocol', None) is not None))
        def check(x):
            for p in clientF.protocol, f.protocol:
                transport = p.transport
                self.assertEquals(transport.getTcpNoDelay(), 0)
                transport.setTcpNoDelay(1)
                self.assertEquals(transport.getTcpNoDelay(), 1)
                transport.setTcpNoDelay(0)
                self.assertEquals(transport.getTcpNoDelay(), 0)
        d.addCallback(check)
        d.addBoth(lambda _: self.cleanPorts(clientF.protocol.transport, port))
        return d
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testClientStartStop(self):
        f = ClosingFactory()
        p = reactor.listenTCP(0, f, interface="127.0.0.1")
        self.n = p.getHost().port
        self.ports.append(p)
        f.port = p

        d = loopUntil(lambda :p.connected)
        def check(ignored):
            factory = ClientStartStopFactory()
            reactor.connectTCP("127.0.0.1", self.n, factory)
            self.assert_(factory.started)
            return loopUntil(lambda :factory.stopped)
        d.addCallback(check)
        d.addBoth(lambda _: self.cleanPorts(*self.ports))
        return d
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testReconnect(self):
        f = ClosingFactory()
        p = reactor.listenTCP(0, f, interface="127.0.0.1")
        n = p.getHost().port
        self.ports.append(p)
        f.port = p

        factory = MyClientFactory()
        d = loopUntil(lambda :p.connected)
        def step1(ignored):
            def clientConnectionLost(c, reason):
                c.connect()
            factory.clientConnectionLost = clientConnectionLost
            reactor.connectTCP("127.0.0.1", n, factory)
            return loopUntil(lambda :factory.failed)

        def step2(ignored):
            p = factory.protocol
            self.assertEquals((p.made, p.closed), (1, 1))
            factory.reason.trap(error.ConnectionRefusedError)
            self.assertEquals(factory.stopped, 1)
            return self.cleanPorts(*self.ports)

        return d.addCallback(step1).addCallback(step2)
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testHostAddress(self):
        f1 = MyServerFactory()
        p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
        n = p1.getHost().port
        self.ports.append(p1)

        f2 = MyOtherClientFactory()
        p2 = reactor.connectTCP('127.0.0.1', n, f2)

        d = loopUntil(lambda :p2.state == "connected")
        def check(ignored):
            self.assertEquals(p1.getHost(), f2.address)
            self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
            return p1.stopListening()
        def cleanup(ignored):
            self.ports.append(p2.transport)
            return self.cleanPorts(*self.ports)
        return d.addCallback(check).addCallback(cleanup)
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testWriter(self):
        f = protocol.Factory()
        f.protocol = WriterProtocol
        f.done = 0
        f.problem = 0
        wrappedF = WiredFactory(f)
        p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
        n = p.getHost().port
        self.ports.append(p)
        clientF = WriterClientFactory()
        wrappedClientF = WiredFactory(clientF)
        reactor.connectTCP("127.0.0.1", n, wrappedClientF)

        def check(ignored):
            self.failUnless(f.done, "writer didn't finish, it probably died")
            self.failUnless(f.problem == 0, "writer indicated an error")
            self.failUnless(clientF.done,
                            "client didn't see connection dropped")
            expected = "".join(["Hello Cleveland!\n",
                                "Goodbye", " cruel", " world", "\n"])
            self.failUnless(clientF.data == expected,
                            "client didn't receive all the data it expected")
        d = defer.gatherResults([wrappedF.onDisconnect,
                                 wrappedClientF.onDisconnect])
        return d.addCallback(check)
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _testBuildProtocol(self, portno):
        f = AServerFactory(self, IPv4Address('TCP', '127.0.0.1', portno))
        wrappedF = FireOnListenFactory(f)
        p = reactor.listenTCP(0, wrappedF)
        self.ports.append(p)

        def client(ignored):
            acf = AClientFactory(self, IPv4Address("TCP", "127.0.0.1",
                                                   p.getHost().port))
            wired = WiredFactory(acf)
            reactor.connectTCP("127.0.0.1", p.getHost().port, wired,
                               bindAddress=("127.0.0.1", portno))
            d = wired.onConnect
            def _onConnect(ignored):
                self.ports.append(acf.protocol.transport)
                self.assert_(hasattr(self, "ran"))
                return wired.onDisconnect
            def _onDisconnect(ignored):
                del self.ran
            d.addCallback(_onConnect)
            d.addCallback(_onDisconnect)
            return d

        return wrappedF.deferred.addCallback(client)
test_factories.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testStopTrying(self):
        f = Factory()
        f.protocol = In
        f.connections = 0
        f.allMessages = []
        f.goal = 2
        f.d = defer.Deferred()

        c = ReconnectingClientFactory()
        c.initialDelay = c.delay = 0.2
        c.protocol = Out
        c.howManyTimes = 2

        port = self.port = reactor.listenTCP(0, f)
        PORT = port.getHost().port
        reactor.connectTCP('127.0.0.1', PORT, c)

        f.d.addCallback(self._testStopTrying_1, f, c)
        return f.d
ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ftp_PORT(self, address):
        addr = map(int, address.split(','))
        ip = '%d.%d.%d.%d' % tuple(addr[:4])
        port = addr[4] << 8 | addr[5]

        # if we have a DTP port set up, lose it.
        if self.dtpFactory is not None:
            self.cleanupDTP()

        self.dtpFactory = DTPFactory(pi=self, peerHost=self.transport.getPeer().host)
        self.dtpFactory.setTimeout(self.dtpTimeout)
        self.dtpPort = reactor.connectTCP(ip, port, self.dtpFactory)

        def connected(ignored):
            return ENTERING_PORT_MODE
        def connFailed(err):
            err.trap(PortConnectionError)
            return CANT_OPEN_DATA_CNX
        return self.dtpFactory.deferred.addCallbacks(connected, connFailed)
client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def lookupZone(self, name, timeout = 10):
        """
        Perform an AXFR request. This is quite different from usual
        DNS requests. See http://cr.yp.to/djbdns/axfr-notes.html for
        more information.
        """
        address = self.pickServer()
        if address is None:
            return defer.fail(IOError('No domain name servers available'))
        host, port = address
        d = defer.Deferred()
        controller = AXFRController(name, d)
        factory = DNSClientFactory(controller, timeout)
        factory.noisy = False #stfu

        from twisted.internet import reactor
        connector = reactor.connectTCP(host, port, factory)
        controller.timeoutCall = reactor.callLater(timeout or 10,
                                                   self._timeoutZone,
                                                   d, controller,
                                                   connector,
                                                   timeout or 10)
        return d.addCallback(self._cbLookupZone, connector)
Downloader.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, url, outputfile, contextFactory=None, *args, **kwargs):
        if hasattr(client, '_parse'):
            scheme, host, port, path = client._parse(url)
        else:
            try:
                from twisted.web.client import _URI as URI
            except ImportError:
                from twisted.web.client import URI
            uri = URI.fromBytes(url)
            scheme = uri.scheme
            host = uri.host
            port = uri.port or (443 if scheme == 'https' else 80)
            path = uri.path

        self.factory = HTTPProgressDownloader(url, outputfile, *args, **kwargs)
        if scheme == "https":
            from twisted.internet import ssl
            if contextFactory is None:
                contextFactory = ssl.ClientContextFactory()
            self.connection = reactor.connectSSL(host, port, self.factory, contextFactory)
        else:
            self.connection = reactor.connectTCP(host, port, self.factory)
CCcamInfo.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getPage(url, contextFactory=None, *args, **kwargs):
    scheme, host, port, path, username, password = _parse(url)

    if username and password:
        url = scheme + '://' + host + ':' + str(port) + path
        basicAuth = encodestring("%s:%s" % (username, password))
        authHeader = "Basic " + basicAuth.strip()
        AuthHeaders = {"Authorization": authHeader}

        if kwargs.has_key("headers"):
            kwargs["headers"].update(AuthHeaders)
        else:
            kwargs["headers"] = AuthHeaders

    factory = HTTPClientFactory(url, *args, **kwargs)
    reactor.connectTCP(host, port, factory)

    return factory.deferred
p2p.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _think(self):
        try:
            if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
                (host, port), = self.node.get_good_peers(1)

                if self._host_to_ident(host) in self.attempts:
                    pass
                elif host in self.node.bans and self.node.bans[host] > time.time():
                    pass
                else:
                    #print 'Trying to connect to', host, port
                    reactor.connectTCP(host, port, self, timeout=5)
        except:
            log.err()

        return random.expovariate(1/1)
network.py 文件源码 项目:bptc_wallet 作者: ceddie 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def push_to_member(self, member: Member, ignore_for_statistics=False) -> None:
        """Push to the specified member."""

        bptc.logger.debug('Push to {}... ({}, {})'.format(member.verify_key[:6], member.address.host, member.address.port))

        with self.hashgraph.lock:
            data_string = self.generate_data_string(self.hashgraph.me,
                                                    self.hashgraph.get_unknown_events_of(member),
                                                    filter_members_with_address(self.hashgraph.known_members.values()))

        if not ignore_for_statistics:
            factory = PushClientFactory(data_string, network=self, receiver=member)
        else:
            factory = PushClientFactory(data_string, network=None, receiver=member)

        def push():
            if member.address is not None:
                reactor.connectTCP(member.address.host, member.address.port, factory)

        threads.blockingCallFromThread(reactor, push)
imapclient.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    hostname = raw_input('IMAP4 Server Hostname: ')
    port = raw_input('IMAP4 Server Port (the default is 143): ')
    username = raw_input('IMAP4 Username: ')
    password = util.getPassword('IMAP4 Password: ')

    onConn = defer.Deferred(
    ).addCallback(cbServerGreeting, username, password
                  ).addErrback(ebConnection
                               ).addBoth(cbClose)

    factory = SimpleIMAP4ClientFactory(username, onConn)

    from twisted.internet import reactor
    conn = reactor.connectTCP(hostname, int(port), factory)
    reactor.run()
p2p.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _think(self):
        try:
            if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
                (host, port), = self.node.get_good_peers(1)

                if self._host_to_ident(host) in self.attempts:
                    pass
                elif host in self.node.bans and self.node.bans[host] > time.time():
                    pass
                else:
                    #print 'Trying to connect to', host, port
                    reactor.connectTCP(host, port, self, timeout=5)
        except:
            log.err()

        return random.expovariate(1/1)
Downloader.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, url, outputfile, contextFactory=None, *args, **kwargs):
        if hasattr(client, '_parse'):
            scheme, host, port, path = client._parse(url)
        else:
            try:
                from twisted.web.client import _URI as URI
            except ImportError:
                from twisted.web.client import URI
            uri = URI.fromBytes(url)
            scheme = uri.scheme
            host = uri.host
            port = uri.port
            path = uri.path

        self.factory = HTTPProgressDownloader(url, outputfile, *args, **kwargs)
        if scheme == 'https':
            from twisted.internet import ssl
            if contextFactory is None:
                contextFactory = ssl.ClientContextFactory()
            self.connection = reactor.connectSSL(host, port, self.factory, contextFactory)
        else:
            self.connection = reactor.connectTCP(host, port, self.factory)
CCcamInfo.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getPage(url, contextFactory=None, *args, **kwargs):
    scheme, host, port, path, username, password = _parse(url)

    if username and password:
        url = scheme + '://' + host + ':' + str(port) + path
        basicAuth = encodestring("%s:%s" % (username, password))
        authHeader = "Basic " + basicAuth.strip()
        AuthHeaders = {"Authorization": authHeader}

        if kwargs.has_key("headers"):
            kwargs["headers"].update(AuthHeaders)
        else:
            kwargs["headers"] = AuthHeaders

    factory = HTTPClientFactory(url, *args, **kwargs)
    reactor.connectTCP(host, port, factory)

    return factory.deferred
p2p.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _think(self):
        try:
            if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
                (host, port), = self.node.get_good_peers(1)

                if self._host_to_ident(host) in self.attempts:
                    pass
                elif host in self.node.bans and self.node.bans[host] > time.time():
                    pass
                else:
                    #print 'Trying to connect to', host, port
                    reactor.connectTCP(host, port, self, timeout=5)
        except:
            log.err()

        return random.expovariate(1/1)
EchoClient.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    f = EchoFactory()  # ???EchoFactory
    reactor.connectTCP("localhost", 8000, f)
    # twisted.internet.selectreactor.SelectReactor
    # ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase???
    # connectTCP(self, host, port, factory, timeout=30, bindAddress=None):??
    #
    #
    # ???????twisted.internet.tcp.Connector(),????????client?ClientFactory,
    #
    reactor.run()  # ?????????????
    # run?????????startRunning??,startRunning???ReactorBase??startRunning??
    # run?????????mainLoop??
    # mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select????
    # ???????,??self._doReadOrWrite??,?????????,????????client,????????????,
    # ??twisted.internet.tcp.BaseClient().doConnect,???self._connectDone(),?????self.protocol.makeConnection(self)
    # ?????????self.connectionMade(),??????????????,??????EchoClient().connectionMade()
client.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def downloadPage(url, file, contextFactory=None, *args, **kwargs):
    """Download a web page to a file.

    @param file: path to file on filesystem, or file-like object.

    See HTTPDownloader to see what extra args can be passed.
    """
    scheme, host, port, path = _parse(url)
    factory = HTTPDownloader(url, file, *args, **kwargs)
    if scheme == 'https':
        from twisted.internet import ssl
        if contextFactory is None:
            contextFactory = ssl.ClientContextFactory()
        reactor.connectSSL(host, port, factory, contextFactory)
    else:
        reactor.connectTCP(host, port, factory)
    return factory.deferred
distrib.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def render(self, request):
        """Render this request, from my server.

        This will always be asynchronous, and therefore return NOT_DONE_YET.
        It spins off a request to the pb client, and either adds it to the list
        of pending issues or requests it immediately, depending on if the
        client is already connected.
        """
        if not self.publisher:
            self.pending.append(request)
            if not self.waiting:
                self.waiting = 1
                bf = pb.PBClientFactory()
                timeout = 10
                if self.host == "unix":
                    reactor.connectUNIX(self.port, bf, timeout)
                else:
                    reactor.connectTCP(self.host, self.port, bf, timeout)
                d = bf.getRootObject()
                d.addCallbacks(self.connected, self.notConnected)

        else:
            i = Issue(request)
            self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
        return NOT_DONE_YET
proxy.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def process(self):
        parsed = urlparse.urlparse(self.uri)
        protocol = parsed[0]
        host = parsed[1]
        port = self.ports[protocol]
        if ':' in host:
            host, port = host.split(':')
            port = int(port)
        rest = urlparse.urlunparse(('','')+parsed[2:])
        if not rest:
            rest = rest+'/'
        class_ = self.protocols[protocol]
        headers = self.getAllHeaders().copy()
        if not headers.has_key('host'):
            headers['host'] = host
        self.content.seek(0, 0)
        s = self.content.read()
        clientFactory = class_(self.method, rest, self.clientproto, headers,
                               s, self)
        reactor.connectTCP(host, port, clientFactory)
test_tcp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def testTcpNoDelay(self):
        f = MyServerFactory()
        port = reactor.listenTCP(0, f, interface="127.0.0.1")

        self.n = port.getHost().port
        self.ports.append(port)
        clientF = MyClientFactory()
        reactor.connectTCP("127.0.0.1", self.n, clientF)

        d = loopUntil(lambda: (f.called > 0 and
                               getattr(clientF, 'protocol', None) is not None))
        def check(x):
            for p in clientF.protocol, f.protocol:
                transport = p.transport
                self.assertEquals(transport.getTcpNoDelay(), 0)
                transport.setTcpNoDelay(1)
                self.assertEquals(transport.getTcpNoDelay(), 1)
                transport.setTcpNoDelay(0)
                self.assertEquals(transport.getTcpNoDelay(), 0)
        d.addCallback(check)
        d.addBoth(lambda _: self.cleanPorts(clientF.protocol.transport, port))
        return d
test_tcp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testClientStartStop(self):
        f = ClosingFactory()
        p = reactor.listenTCP(0, f, interface="127.0.0.1")
        self.n = p.getHost().port
        self.ports.append(p)
        f.port = p

        d = loopUntil(lambda :p.connected)
        def check(ignored):
            factory = ClientStartStopFactory()
            reactor.connectTCP("127.0.0.1", self.n, factory)
            self.assert_(factory.started)
            return loopUntil(lambda :factory.stopped)
        d.addCallback(check)
        d.addBoth(lambda _: self.cleanPorts(*self.ports))
        return d
test_tcp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testReconnect(self):
        f = ClosingFactory()
        p = reactor.listenTCP(0, f, interface="127.0.0.1")
        n = p.getHost().port
        self.ports.append(p)
        f.port = p

        factory = MyClientFactory()
        d = loopUntil(lambda :p.connected)
        def step1(ignored):
            def clientConnectionLost(c, reason):
                c.connect()
            factory.clientConnectionLost = clientConnectionLost
            reactor.connectTCP("127.0.0.1", n, factory)
            return loopUntil(lambda :factory.failed)

        def step2(ignored):
            p = factory.protocol
            self.assertEquals((p.made, p.closed), (1, 1))
            factory.reason.trap(error.ConnectionRefusedError)
            self.assertEquals(factory.stopped, 1)
            return self.cleanPorts(*self.ports)

        return d.addCallback(step1).addCallback(step2)
test_tcp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testHostAddress(self):
        f1 = MyServerFactory()
        p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
        n = p1.getHost().port
        self.ports.append(p1)

        f2 = MyOtherClientFactory()
        p2 = reactor.connectTCP('127.0.0.1', n, f2)

        d = loopUntil(lambda :p2.state == "connected")
        def check(ignored):
            self.assertEquals(p1.getHost(), f2.address)
            self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
            return p1.stopListening()
        def cleanup(ignored):
            self.ports.append(p2.transport)
            return self.cleanPorts(*self.ports)
        return d.addCallback(check).addCallback(cleanup)


问题


面经


文章

微信
公众号

扫码关注公众号