python类fail()的实例源码

downloadhandlers.py 文件源码 项目:NetEaseMusicCrawler 作者: yaochao 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _response(self, _, driver, spider):
        body = driver.execute_script('return document.documentElement.innerHTML')
        if body.startswith(
                "<head></head>"):  # selenium ????http??????,???????????????,????body??????<head></head>???,??????
            body = driver.execute_script('return document.documentElement.textContent')
        url = driver.current_url
        respcls = responsetypes.from_args(url=url, body=body[:100].encode('utf-8'))
        response = respcls(url=url, body=body, encoding='utf-8')

        response_failed = getattr(spider, 'response_failed', None)
        if response_failed and callable(response_failed) and response_failed(response, driver):
            driver.quit()
            return defer.fail(Failure())
        else:
            self.queue.put(driver)  # ?driver????queue
            return defer.succeed(response)  # ??response??
userauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _pamConv(self, items):
        resp = []
        for message, kind in items:
            if kind == 1: # password
                resp.append((message, 0))
            elif kind == 2: # text
                resp.append((message, 1))
            elif kind in (3, 4):
                return defer.fail(error.ConchError('cannot handle PAM 3 or 4 messages'))
            else:
                return defer.fail(error.ConchError('bad PAM auth kind %i' % kind))
        packet = NS('')+NS('')+NS('')
        packet += struct.pack('>L', len(resp))
        for prompt, echo in resp:
            packet += NS(prompt)
            packet += chr(echo)
        self.transport.sendPacket(MSG_USERAUTH_INFO_REQUEST, packet)
        self._pamDeferred = defer.Deferred()
        return self._pamDeferred
default.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getPrivateKey(self):
        file = os.path.expanduser(self.usedFiles[-1])
        if not os.path.exists(file):
            return None
        try:
            return defer.succeed(keys.getPrivateKeyObject(file))
        except keys.BadKeyError, e:
            if e.args[0] == 'encrypted key with no passphrase':
                for i in range(3):
                    prompt = "Enter passphrase for key '%s': " % \
                           self.usedFiles[-1]
                    try:
                        p = self._getPassword(prompt)
                        return defer.succeed(keys.getPrivateKeyObject(file, passphrase = p))
                    except (keys.BadKeyError, ConchError):
                        pass
                return defer.fail(ConchError('bad password'))
            raise
        except KeyboardInterrupt:
            print
            reactor.stop()
test_cooperator.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testStopOutstanding(self):
        """
        Test that a running iterator paused on a third-party Deferred will
        properly stop when .stop() is called.
        """
        testControlD = defer.Deferred()
        outstandingD = defer.Deferred()
        def myiter():
            reactor.callLater(0, testControlD.callback, None)
            yield outstandingD
            self.fail()
        c = task.Cooperator()
        d = c.coiterate(myiter())
        def stopAndGo(ign):
            c.stop()
            outstandingD.callback('arglebargle')

        testControlD.addCallback(stopAndGo)
        d.addCallback(self.cbIter)
        d.addErrback(self.ebIter)

        return d.addCallback(lambda result: self.assertEquals(result, self.RESULT))
test_threads.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testWakerOverflow(self):
        self.failure = None
        waiter = threading.Event()
        def threadedFunction():
            # Hopefully a hundred thousand queued calls is enough to
            # trigger the error condition
            for i in xrange(100000):
                try:
                    reactor.callFromThread(lambda: None)
                except:
                    self.failure = failure.Failure()
                    break
            waiter.set()
        reactor.callInThread(threadedFunction)
        waiter.wait(120)
        if not waiter.isSet():
            self.fail("Timed out waiting for event")
        if self.failure is not None:
            return defer.fail(self.failure)
test_assertions.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_assertFailure_masked(self):
        """A single wrong assertFailure should fail the whole test.
        """
        class ExampleFailure(Exception):
            pass

        class TC(unittest.TestCase):
            failureException = ExampleFailure
            def test_assertFailure(self):
                d = defer.maybeDeferred(lambda: 1/0)
                self.assertFailure(d, OverflowError)
                self.assertFailure(d, ZeroDivisionError)
                return d

        test = TC('test_assertFailure')
        result = reporter.TestResult()
        test.run(result)
        self.assertEqual(1, len(result.failures))
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def articleRequest(self, group, index, id = None):
        if id is not None:
            raise NotImplementedError

        if self.db.has_key(group):
            if self.db[group].has_key(index):
                a = self.db[group][index]
                return defer.succeed((
                    index,
                    a.getHeader('Message-ID'),
                    StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
                ))
            else:
                return defer.fail(ERR_NOARTICLE)
        else:
            return defer.fail(ERR_NOGROUP)
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def articleRequest(self, group, index, id = None):
        if id is not None:
            try:
                xref = self.dbm['Message-IDs'][id]
            except KeyError:
                return defer.fail(NewsServerError("No such article: " + id))
            else:
                group, index = xref[0]
                index = int(index)

        try:
            a = self.dbm['groups'][group].articles[index]
        except KeyError:
            return defer.fail(NewsServerError("No such group: " + group))
        else:
            return defer.succeed((
                index,
                a.getHeader('Message-ID'),
                StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
            ))
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def headRequest(self, group, index, id = None):
        if id is not None:
            try:
                xref = self.dbm['Message-IDs'][id]
            except KeyError:
                return defer.fail(NewsServerError("No such article: " + id))
            else:
                group, index = xref[0]
                index = int(index)

        try:
            a = self.dbm['groups'][group].articles[index]
        except KeyError:
            return defer.fail(NewsServerError("No such group: " + group))
        else:
            return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requestAvatarId(self, c):
        try:
            u, p = self.getUser(c.username)
        except KeyError:
            return defer.fail(error.UnauthorizedLogin())
        else:
            up = credentials.IUsernamePassword(c, None)
            if self.hash:
                if up is not None:
                    h = self.hash(up.username, up.password, p)
                    if h == p:
                        return defer.succeed(u)
                return defer.fail(error.UnauthorizedLogin())
            else:
                return defer.maybeDeferred(c.checkPassword, p
                    ).addCallback(self._cbPasswordMatch, u)
pamauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def defConv(items):
    resp = []
    for i in range(len(items)):
        message, kind = items[i]
        if kind == 1: # password
            p = getpass.getpass(message)
            resp.append((p, 0))
        elif kind == 2: # text
            p = raw_input(message)
            resp.append((p, 0))
        elif kind in (3,4):
            print message
            resp.append(("", 0))
        else:
            return defer.fail('foo')
    d = defer.succeed(resp)
    return d
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def callRemote(self, name, *args, **kw):
        """Call a specially-designated local method.

        self.callRemote('x') will first try to invoke a method named
        sync_x and return its result (which should probably be a
        Deferred).  Second, it will look for a method called async_x,
        which will be called and then have its result (or Failure)
        automatically wrapped in a Deferred.
        """
        if hasattr(self, 'sync_'+name):
            return getattr(self, 'sync_'+name)(*args, **kw)
        try:
            method = getattr(self, "async_" + name)
            return defer.succeed(method(*args, **kw))
        except:
            f = Failure()
            if self.reportAllTracebacks:
                f.printTraceback()
            return defer.fail(f)
ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ftp_USER(self, username):
        """
        First part of login.  Get the username the peer wants to
        authenticate as.
        """
        if not username:
            return defer.fail(CmdSyntaxError('USER requires an argument'))

        self._user = username
        self.state = self.INAUTH
        if self.factory.allowAnonymous and self._user == self.factory.userAnonymous:
            return GUEST_NAME_OK_NEED_EMAIL
        else:
            return (USR_NAME_OK_NEED_PASS, username)

    # TODO: add max auth try before timeout from ip...
    # TODO: need to implement minimal ABOR command
amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def dispatchCommand(self, box):
        """
        A box with a _command key was received.

        Dispatch it to a local handler call it.

        @param proto: an AMP instance.
        @param box: an AmpBox to be dispatched.
        """
        cmd = box[COMMAND]
        fObj = self.lookupFunction(cmd)
        if fObj is None:
            return fail(RemoteAmpError(
                    UNHANDLED_ERROR_CODE,
                    "Unhandled Command: %r" % (cmd,),
                    False,
                    local=Failure(UnhandledCommand())))
        return maybeDeferred(fObj, box)
client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def queryTCP(self, queries, timeout = 10):
        """
        Make a number of DNS queries via TCP.

        @type queries: Any non-zero number of C{dns.Query} instances
        @param queries: The queries to make.

        @type timeout: C{int}
        @param timeout: The number of seconds after which to fail.

        @rtype: C{Deferred}
        """
        if not len(self.connections):
            address = self.pickServer()
            if address is None:
                return defer.fail(IOError("No domain name servers available"))
            host, port = address
            from twisted.internet import reactor
            reactor.connectTCP(host, port, self.factory)
            self.pending.append((defer.Deferred(), queries, timeout))
            return self.pending[-1][0]
        else:
            return self.connections[0].query(queries, timeout)
authority.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def lookupZone(self, name, timeout = 10):
        if self.soa[0].lower() == name.lower():
            # Wee hee hee hooo yea
            default_ttl = max(self.soa[1].minimum, self.soa[1].expire)
            if self.soa[1].ttl is not None:
                soa_ttl = self.soa[1].ttl
            else:
                soa_ttl = default_ttl
            results = [dns.RRHeader(self.soa[0], dns.SOA, dns.IN, soa_ttl, self.soa[1], auth=True)]
            for (k, r) in self.records.items():
                for rec in r:
                    if rec.ttl is not None:
                        ttl = rec.ttl
                    else:
                        ttl = default_ttl
                    if rec.TYPE != dns.SOA:
                        results.append(dns.RRHeader(k, rec.TYPE, dns.IN, ttl, rec, auth=True))
            results.append(results[0])
            return defer.succeed((results, (), ()))
        return defer.fail(failure.Failure(dns.DomainError(name)))
cache.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _lookup(self, name, cls, type, timeout):
        now = time.time()
        q = dns.Query(name, type, cls)
        try:
            when, (ans, auth, add) = self.cache[q]
        except KeyError:
            if self.verbose > 1:
                log.msg('Cache miss for ' + repr(name))
            return defer.fail(failure.Failure(dns.DomainError(name)))
        else:
            if self.verbose:
                log.msg('Cache hit for ' + repr(name))
            diff = now - when
            return defer.succeed((
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in ans],
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in auth],
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in add]
            ))
imap4.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __cbAuthenticate(self, caps, secret):
        auths = caps.get('AUTH', ())
        for scheme in auths:
            if scheme.upper() in self.authenticators:
                cmd = Command('AUTHENTICATE', scheme, (),
                              self.__cbContinueAuth, scheme,
                              secret)
                return self.sendCommand(cmd)

        if self.startedTLS:
            return defer.fail(NoSupportedAuthentication(
                auths, self.authenticators.keys()))
        else:
            def ebStartTLS(err):
                err.trap(IMAP4Exception)
                # We couldn't negotiate TLS for some reason
                return defer.fail(NoSupportedAuthentication(
                    auths, self.authenticators.keys()))

            d = self.startTLS()
            d.addErrback(ebStartTLS)
            d.addCallback(lambda _: self.getCapabilities())
            d.addCallback(self.__cbAuthTLS, secret)
            return d
test_mail.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_failureWithSuccessfulFallback(self):
        """
        Test that if the MX record lookup fails, fallback is enabled, and an A
        record is available for the name, then the Deferred returned by
        L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
        gives the address in the A record for the name.
        """
        class DummyResolver(object):
            """
            Fake resolver which will fail an MX lookup but then succeed a
            getHostByName call.
            """
            def lookupMailExchange(self, domain):
                return defer.fail(DNSNameError())

            def getHostByName(self, domain):
                return defer.succeed("1.2.3.4")

        self.mx.resolver = DummyResolver()
        d = self.mx.getMX("domain")
        d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
        return d
pop3client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _login(self, caps, username, password):
        if self.serverChallenge is not None:
            return self._apop(username, password, self.serverChallenge)

        tryTLS = 'STLS' in caps

        #If our transport supports switching to TLS, we might want to try to switch to TLS.
        tlsableTransport = interfaces.ITLSTransport(self.transport, None) is not None

        # If our transport is not already using TLS, we might want to try to switch to TLS.
        nontlsTransport = interfaces.ISSLTransport(self.transport, None) is None

        if not self.startedTLS and tryTLS and tlsableTransport and nontlsTransport:
            d = self.startTLS()

            d.addCallback(self._loginTLS, username, password)
            return d

        elif self.startedTLS or not nontlsTransport or self.allowInsecureLogin:
            return self._plaintext(username, password)
        else:
            return defer.fail(InsecureAuthenticationDisallowed())
test_incoming_mail.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testExtractInvalidAttachedKey(self):
        KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."

        message = MIMEMultipart()
        message.add_header("from", ADDRESS_2)
        key = MIMEApplication("", "pgp-keys")
        key.set_payload(KEY)
        message.attach(key)
        self.fetcher._keymanager.put_raw_key = Mock(
            return_value=defer.fail(KeyAddressMismatch()))

        def put_raw_key_called(_):
            self.fetcher._keymanager.put_raw_key.assert_called_once_with(
                KEY, address=ADDRESS_2)

        d = self._do_fetch(message.as_string())
        d.addCallback(put_raw_key_called)
        d.addErrback(log.err)
        return d
test_incoming_mail.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testLogErrorIfDecryptFails(self):

        def assert_failure(_):
            mock_logger_error.assert_any_call('_decrypt_doc: '
                                              'Error decrypting document with '
                                              'ID 1')

        with patch.object(Logger, 'error') as mock_logger_error:
            doc = SoledadDocument()
            doc.doc_id = '1'
            doc.content = {'_enc_json': ''}

            self.fetcher._process_decrypted_doc = Mock()
            self.km.decrypt = Mock(
                return_value=defer.fail(Exception()))

            d = self.fetcher._decrypt_doc(doc)
            d.addCallback(assert_failure)
            return d
cred.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def checkSoledadToken(self, username, password, service):
        soledad = self._soledad_sessions.get(username)
        if not soledad:
            return defer.fail(Exception("No soledad"))

        def match_token(token):
            if token is None:
                raise RuntimeError('no token')
            if token == password:
                return username
            else:
                raise RuntimeError('bad token')

        d = soledad.get_or_create_service_token(service)
        d.addCallback(match_token)
        return d
test_cloud.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        super(CloudTest, self).setUp()
        self.query_results = {}
        self.kwargs = {}

        def fetch_stub(url, **kwargs):
            self.kwargs = kwargs
            value = self.query_results[url]
            if isinstance(value, Exception):
                return fail(value)
            else:
                return succeed(value)

        self.fetch_func = fetch_stub
        self.add_query_result("instance-id", b"i00001")
        self.add_query_result("ami-id", b"ami-00002")
        self.add_query_result("instance-type", b"hs1.8xlarge")
test_configuration.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_import_from_file_with_empty_client_section(self):
        old_configuration = "[client]\n"

        config_filename = self.makeFile("", old_configuration,
                                        basename="final_config")
        import_filename = self.makeFile("", basename="import_config")

        # Use a command line option as well to test the precedence.
        try:
            self.get_config(["--config", config_filename, "--silent",
                             "--import", import_filename])
        except ImportOptionError as error:
            self.assertEqual(str(error),
                             "Nothing to import at %s." % import_filename)
        else:
            self.fail("ImportOptionError not raised")
test_configuration.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_import_from_url_with_pycurl_error(
            self, mock_fetch, mock_print_text):
        mock_fetch.side_effect = PyCurlError(60, "pycurl message")

        config_filename = self.makeFile("", basename="final_config")

        try:
            self.get_config(["--config", config_filename, "--silent",
                             "--import", "https://config.url"])
        except ImportOptionError as error:
            self.assertEqual(str(error),
                             "Couldn't download configuration from "
                             "https://config.url: Error 60: pycurl message")
        else:
            self.fail("ImportOptionError not raised")
        mock_fetch.assert_called_once_with("https://config.url")
        mock_print_text.assert_called_once_with(
            "Fetching configuration from https://config.url...")
test_configuration.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def test_register_registration_error(self):
        """
        If we get a registration error, the register() function returns the
        error from the registration response message.
        """
        self.reactor.call_later(0, self.reactor.fire, "registration-failed")

        def fail_register():
            return fail(RegistrationError("max-pending-computers"))

        self.remote.register = fail_register

        connector_factory = FakeConnectorFactory(self.remote)
        result = register(
            config=self.config, reactor=self.reactor,
            connector_factory=connector_factory, max_retries=99)
        self.assertEqual("max-pending-computers", result)
test_watchdog.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_check_daemons(self):
        """
        The daemons are checked to be running every so often. When N=5 of these
        checks fail, the daemon will be restarted.
        """
        clock = Clock()
        dog = WatchDog(clock,
                       broker=AsynchronousPingDaemon("test-broker"),
                       monitor=AsynchronousPingDaemon("test-monitor"),
                       manager=AsynchronousPingDaemon("test-manager"))
        dog.start_monitoring()

        for i in range(4):
            clock.advance(5)
            dog.broker.fire_running(False)
            dog.monitor.fire_running(True)
            dog.manager.fire_running(True)
            self.assertEqual(dog.broker.boots, [])

        clock.advance(5)
        dog.broker.fire_running(False)
        dog.monitor.fire_running(True)
        dog.manager.fire_running(True)
        self.assertEqual(dog.broker.boots, [STOP, START])
test_releaseupgrader.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_verify_invalid_signature(self, gpg_mock):
        """
        L{ReleaseUpgrader.verify} logs a warning in case the tarball signature
        is not valid.
        """
        gpg_mock.return_value = fail(InvalidGPGSignature("gpg error"))
        tarball_filename = "/some/tarball"
        signature_filename = "/some/signature"

        result = self.upgrader.verify(tarball_filename, signature_filename)

        def check_failure(failure):
            self.assertIn("WARNING: Invalid signature for upgrade-tool "
                          "tarball: gpg error", self.logfile.getvalue())
            gpg_mock.assert_called_once_with(
                tarball_filename, signature_filename)

        result.addCallback(self.fail)
        result.addErrback(check_failure)
        return result
helpers.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def request(self, method, uri, headers=None, bodyProducer=None):
        return fail(self.error)


问题


面经


文章

微信
公众号

扫码关注公众号