python类DNSLookupError()的实例源码

myspider.py 文件源码 项目:scrapy_redis_splash_spider 作者: lymlhhj123 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def error_back(self, failure):
        #request????error_back
        #???????request?????????????????
        #???????dns???????
        #???????????redis???????
        if failure.check(HttpError):
            #?????200?????
            response = failure.value.response
            self.logger.error('[%s] GET [%d]', response.url, response.status)

        if failure.check(DNSLookupError):
            #dns????
            request = failure.request
            self.logger.error('[%s] DNSLookupError', request.url)

        if failure.check(TimeoutError, TCPTimedOutError):
            #????
            request = failure.request
            self.logger.error('[%s] TimeoutError', request.url)
srvconnect.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect(self):
        """Start connection to remote server."""
        self.factory.doStart()
        self.factory.startedConnecting(self)

        if not self.servers:
            if self.domain is None:
                self.connectionFailed(error.DNSLookupError("Domain is not defined."))
                return
            d = client.lookupService('_%s._%s.%s' % (self.service,
                                                     self.protocol,
                                                     self.domain))
            d.addCallback(self._cbGotServers)
            d.addCallback(lambda x, self=self: self._reallyConnect())
            d.addErrback(self.connectionFailed)
        elif self.connector is None:
            self._reallyConnect()
        else:
            self.connector.connect()
relaymanager.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _ebMX(self, failure, domain):
        from twisted.names import error, dns

        if self.fallbackToDomain:
            failure.trap(error.DNSNameError)
            log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))

            # Alright, I admit, this is a bit icky.
            d = self.resolver.getHostByName(domain)
            def cbResolved(addr):
                return dns.Record_MX(name=addr)
            def ebResolved(err):
                err.trap(error.DNSNameError)
                raise DNSLookupError()
            d.addCallbacks(cbResolved, ebResolved)
            return d
        elif failure.check(error.DNSNameError):
            raise IOError("No MX found for %r" % (domain,))
        return failure
test_socks.py 文件源码 项目:telepresence 作者: datawire 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_socks5TorStyleFailedResolution(self):
        """
        A Tor-style name resolution when resolution fails.
        """
        self.assert_handshake()
        self.deliver_data(
            self.sock,
            struct.pack('!BBBB', 5, 0xf0, 0, 3) + struct.pack(
                "!B", len(b"unknown")
            ) + b"unknown" + struct.pack("!H", 3401)
        )
        reply = self.sock.transport.value()
        self.sock.transport.clear()
        self.assertEqual(reply, struct.pack('!BBBB', 5, 4, 0, 0))
        self.assertTrue(self.sock.transport.stringTCPTransport_closing)
        self.assertEqual(len(self.flushLoggedErrors(DNSLookupError)), 1)
spider.py 文件源码 项目:BlogSpider 作者: hack4code 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def errback(self, failure):
        if failure.check(DNSLookupError):
            host = urlparse(failure.request.url).hostname
            logger.error('DNSLookupError on host[%s]',
                         host)
        elif failure.check(TimeoutError,
                           TCPTimedOutError):
            request = failure.request
            logger.error('TimeoutError on url[%s]',
                         request.url)
        elif failure.check(HttpError):
            response = failure.value.response
            logger.error('HttpError on url[%s, status=%d]',
                         response.url,
                         response.status)
        else:
            logger.error('SpiderError on url[%s, error=%s',
                         failure.request.url,
                         repr(failure))
srvconnect.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self):
        """Start connection to remote server."""
        self.factory.doStart()
        self.factory.startedConnecting(self)

        if not self.servers:
            if self.domain is None:
                self.connectionFailed(error.DNSLookupError("Domain is not defined."))
                return
            d = client.lookupService('_%s._%s.%s' % (self.service,
                                                     self.protocol,
                                                     self.domain))
            d.addCallback(self._cbGotServers)
            d.addCallback(lambda x, self=self: self._reallyConnect())
            d.addErrback(self.connectionFailed)
        elif self.connector is None:
            self._reallyConnect()
        else:
            self.connector.connect()
relaymanager.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _ebMX(self, failure, domain):
        from twisted.names import error, dns

        if self.fallbackToDomain:
            failure.trap(error.DNSNameError)
            log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))

            # Alright, I admit, this is a bit icky.
            d = self.resolver.getHostByName(domain)
            def cbResolved(addr):
                return dns.Record_MX(name=addr)
            def ebResolved(err):
                err.trap(error.DNSNameError)
                raise DNSLookupError()
            d.addCallbacks(cbResolved, ebResolved)
            return d
        elif failure.check(error.DNSNameError):
            raise IOError("No MX found for %r" % (domain,))
        return failure
example.py 文件源码 项目:scrapy_redis_spider 作者: lymlhhj123 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def error_back(self, failure):
        #request????error_back
        #???????request?????????????????
        #???????dns???????
        #???????????redis???????

        if failure.check(HttpError):
            #?????200?????
            response = failure.value.response
            self.logger.error('[%s] GET [%d]', response.url, response.status)

        if failure.check(DNSLookupError):

            #dns????
            request = failure.request
            self.logger.error('[%s] DNSLookupError', request.url)

        if failure.check(TimeoutError, TCPTimedOutError):

            #????
            request = failure.request
            self.logger.error('[%s] TimeoutError', request.url)
dns.py 文件源码 项目:sample-klein-app 作者: wsanchez 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def hostname(self, request: IRequest, name: str) -> KleinRenderable:
        """
        Hostname lookup resource.

        Performs a lookup on the given name and responds with the resulting
        address.

        :param request: The request to respond to.

        :param name: A host name.
        """
        try:
            address = await getHostByName(name)
        except DNSNameError:
            request.setResponseCode(http.NOT_FOUND)
            return "no such host"
        except DNSLookupError:
            request.setResponseCode(http.NOT_FOUND)
            return "lookup error"

        return address
test_dns.py 文件源码 项目:sample-klein-app 作者: wsanchez 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_hostname_lookup_error(self) -> None:
        """
        :meth:`.dns.Application.hostname` responds with a
        :const:`twisted.web.http.NOT_FOUND` error if there is a DNS lookup
        error.
        """
        def getHostByName(*args, **kwargs) -> Deferred:
            return fail(DNSLookupError())

        self.patch(dns, "getHostByName", getHostByName)

        self.assertResponse(
            b"/gethostbyname/foo.example.com",
            response_data=b"lookup error",
            response_code=http.NOT_FOUND,
        )
aiqiyi_spider.py 文件源码 项目:video_url_crawler_demo 作者: czs0x55aa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def errback_httpbin(self, failure):
        # log all failures
        self.logger.error(repr(failure))

        # in case you want to do something special for some errors,
        # you may need the failure's type:
        if failure.check(HttpError):
            # these exceptions come from HttpError spider middleware
            # you can get the non-200 response
            response = failure.value.response
            self.logger.error('HttpError on %s', response.url)

        elif failure.check(DNSLookupError):
            # this is the original request
            request = failure.request
            self.logger.error('DNSLookupError on %s', request.url)

        elif failure.check(TimeoutError, TCPTimedOutError):
            request = failure.request
            self.logger.error('TimeoutError on %s', request.url)
test_resolver.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_failure(self):
        """
        L{SimpleResolverComplexifier} translates a known error result from
        L{IResolverSimple.resolveHostName} into an empty result.
        """
        simple = SillyResolverSimple()
        complex = SimpleResolverComplexifier(simple)
        receiver = ResultHolder(self)
        self.assertEqual(receiver._started, False)
        complex.resolveHostName(receiver, u"example.com")
        self.assertEqual(receiver._started, True)
        self.assertEqual(receiver._ended, False)
        self.assertEqual(receiver._addresses, [])
        simple._requests[0].errback(DNSLookupError("nope"))
        self.assertEqual(receiver._ended, True)
        self.assertEqual(receiver._addresses, [])
test_resolver.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_simplifier(self):
        """
        L{ComplexResolverSimplifier} translates an L{IHostnameResolver} into an
        L{IResolverSimple} for applications that still expect the old
        interfaces to be in place.
        """
        self.pool, self.doThreadWork = deterministicPool()
        self.reactor, self.doReactorWork = deterministicReactorThreads()
        self.getter = FakeAddrInfoGetter()
        self.resolver = GAIResolver(self.reactor, lambda: self.pool,
                                    self.getter.getaddrinfo)
        simpleResolver = ComplexResolverSimplifier(self.resolver)
        self.getter.addResultForHost('example.com', ('192.168.3.4', 4321))
        success = simpleResolver.getHostByName('example.com')
        failure = simpleResolver.getHostByName('nx.example.com')
        self.doThreadWork()
        self.doReactorWork()
        self.doThreadWork()
        self.doReactorWork()
        self.assertEqual(self.failureResultOf(failure).type, DNSLookupError)
        self.assertEqual(self.successResultOf(success), '192.168.3.4')
srvconnect.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _cbGotServers(self, result):
        answers, auth, add = result
        if len(answers) == 1 and answers[0].type == dns.SRV \
                             and answers[0].payload \
                             and answers[0].payload.target == dns.Name(b'.'):
            # decidedly not available
            raise error.DNSLookupError("Service %s not available for domain %s."
                                       % (repr(self.service), repr(self.domain)))

        self.servers = []
        self.orderedServers = []
        for a in answers:
            if a.type != dns.SRV or not a.payload:
                continue

            self.orderedServers.append(a.payload)
initiatives.py 文件源码 项目:tipi-engine 作者: CIECODE-Madrid 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def errback_httpbin(self, failure):

        self.logger.error(repr(failure))

        #if isinstance(failure.value, HttpError):
        if failure.check(HttpError):
            # you can get the response
            response = failure.value.response
            self.logger.error('HttpError on %s', response.url)

        #elif isinstance(failure.value, DNSLookupError):
        elif failure.check(DNSLookupError):
            # this is the original request
            request = failure.request
            self.logger.error('DNSLookupError on %s', request.url)

        #elif isinstance(failure.value, TimeoutError):
        elif failure.check(TimeoutError):
            request = failure.request
            self.logger.error('TimeoutError on %s', request.url)
base.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _fail(self, name, err):
        err = error.DNSLookupError("address %r not found: %s" % (name, err))
        return failure.Failure(err)
base.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
        try:
            address = socket.gethostbyname(name)
        except socket.error:
            msg = "address %r not found" % (name,)
            err = error.DNSLookupError(msg)
            return defer.fail(err)
        else:
            return defer.succeed(address)
test_udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testDNSFailure(self):
        client = Client()
        d = client.startedDeferred = defer.Deferred()
        # if this domain exists, shoot your sysadmin
        reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client)

        def didNotConnect(ign):
            self.assertEquals(client.stopped, 0)
            self.assertEquals(client.started, 0)

        d = self.assertFailure(d, error.DNSLookupError)
        d.addCallback(didNotConnect)
        return d
socks.py 文件源码 项目:telepresence 作者: datawire 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _handle_error(self, failure):
        """Handle errors in connecting or resolving."""
        log.err(failure)
        error_code = 1
        if failure.check(DNSLookupError):
            error_code = 4
        if failure.check(ConnectionRefusedError):
            error_code = 5
        self._write_response(error_code, "0.0.0.0", 0)
test_socks.py 文件源码 项目:telepresence 作者: datawire 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def resolve(self, hostname):
        """
        Resolve a hostname by looking it up in the C{names} dictionary.
        """
        try:
            return defer.succeed(self.names[hostname])
        except KeyError:
            return defer.fail(
                DNSLookupError(
                    "FakeResolverReactor couldn't find {}".format(hostname)
                )
            )
base.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _fail(self, name, err):
        err = error.DNSLookupError("address %r not found: %s" % (name, err))
        return failure.Failure(err)
base.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
        try:
            address = socket.gethostbyname(name)
        except socket.error:
            msg = "address %r not found" % (name,)
            err = error.DNSLookupError(msg)
            return defer.fail(err)
        else:
            return defer.succeed(address)
test_udp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testDNSFailure(self):
        client = Client()
        d = client.startedDeferred = defer.Deferred()
        # if this domain exists, shoot your sysadmin
        reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client)

        def didNotConnect(ign):
            self.assertEquals(client.stopped, 0)
            self.assertEquals(client.started, 0)

        d = self.assertFailure(d, error.DNSLookupError)
        d.addCallback(didNotConnect)
        return d
downloadermiddlewares.py 文件源码 项目:scrapy_redis_spider 作者: lymlhhj123 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process_exception(self, request, exception, spider):
        #???????????????????request??????
        #???????????????

        #????????????????????????????????????
        if isinstance(exception, DNSLookupError):           
            self.crawler.signals.send_catch_log(dnslookuperror, spider=spider)

        if isinstance(exception, (TimeoutError, TCPTimedOutError)):         
            self.crawler.signals.send_catch_log(timeouterror, spider=spider)

        return request
test_base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_failure(self):
        """
        L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires a
        L{Failure} if the call to L{socket.gethostbyname} raises an exception.
        """
        timeout = 30

        reactor = FakeReactor()
        self.addCleanup(reactor._stop)

        def fakeGetHostByName(name):
            raise IOError("ENOBUFS (this is a funny joke)")

        self.patch(socket, 'gethostbyname', fakeGetHostByName)

        failedWith = []
        resolver = ThreadedResolver(reactor)
        d = resolver.getHostByName("some.name", (timeout,))
        self.assertFailure(d, DNSLookupError)
        d.addCallback(failedWith.append)

        reactor._runThreadCalls()

        self.assertEqual(len(failedWith), 1)

        # Make sure that any timeout-related stuff gets cleaned up.
        reactor._clock.advance(timeout + 1)
        self.assertEqual(reactor._clock.calls, [])
test_base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """
        If L{socket.gethostbyname} does not complete before the specified
        timeout elapsed, the L{Deferred} returned by
        L{ThreadedResolver.getHostByBame} fails with L{DNSLookupError}.
        """
        timeout = 10

        reactor = FakeReactor()
        self.addCleanup(reactor._stop)

        result = Queue()
        def fakeGetHostByName(name):
            raise result.get()

        self.patch(socket, 'gethostbyname', fakeGetHostByName)

        failedWith = []
        resolver = ThreadedResolver(reactor)
        d = resolver.getHostByName("some.name", (timeout,))
        self.assertFailure(d, DNSLookupError)
        d.addCallback(failedWith.append)

        reactor._clock.advance(timeout - 1)
        self.assertEqual(failedWith, [])
        reactor._clock.advance(1)
        self.assertEqual(len(failedWith), 1)

        # Eventually the socket.gethostbyname does finish - in this case, with
        # an exception.  Nobody cares, though.
        result.put(IOError("The I/O was errorful"))
test_endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_failure(self):
        """
        If no address is returned by GAI for a hostname, the connection attempt
        fails with L{error.DNSLookupError}.
        """
        endpoint = endpoints.HostnameEndpoint(
            deterministicResolvingReactor(Clock(), []),
            b"example.com", 80
        )
        clientFactory = object()
        dConnect = endpoint.connect(clientFactory)
        exc = self.failureResultOf(dConnect, error.DNSLookupError)
        self.assertIn("example.com", str(exc))
_resolver.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def resolutionComplete(self):
        """
        See L{IResolutionReceiver.resolutionComplete}
        """
        if self._resolved:
            return
        self._deferred.errback(DNSLookupError(self._resolution.name))
base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _fail(self, name, err):
        err = error.DNSLookupError("address %r not found: %s" % (name, err))
        return failure.Failure(err)
base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
        try:
            address = socket.gethostbyname(name)
        except socket.error:
            msg = "address %r not found" % (name,)
            err = error.DNSLookupError(msg)
            return defer.fail(err)
        else:
            return defer.succeed(address)


问题


面经


文章

微信
公众号

扫码关注公众号