python类succeed()的实例源码

test_refresher.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_log_error_if_fetch_by_fingerprint_returns_wrong_key(self):
        pgp = openpgp.OpenPGPScheme(
            self._soledad, gpgbinary=self.gpg_binary_path)
        km = self._key_manager()

        with patch.object(Logger, 'error') as mock_logger_error:
            rf = RandomRefreshPublicKey(pgp, km)
            rf._get_random_key = \
                Mock(return_value=defer.succeed(OpenPGPKey(
                    fingerprint=KEY_FINGERPRINT)))

            km._nicknym.fetch_key_with_fingerprint = \
                Mock(return_value=defer.succeed(PUBLIC_KEY_2))

            yield rf.maybe_refresh_key()

            mock_logger_error.assert_called_with(
                ERROR_UNEQUAL_FINGERPRINTS %
                (KEY_FINGERPRINT, KEY_FINGERPRINT_2))
downloadhandlers.py 文件源码 项目:NetEaseMusicCrawler 作者: yaochao 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _response(self, _, driver, spider):
        body = driver.execute_script('return document.documentElement.innerHTML')
        if body.startswith(
                "<head></head>"):  # selenium ????http??????,???????????????,????body??????<head></head>???,??????
            body = driver.execute_script('return document.documentElement.textContent')
        url = driver.current_url
        respcls = responsetypes.from_args(url=url, body=body[:100].encode('utf-8'))
        response = respcls(url=url, body=body, encoding='utf-8')

        response_failed = getattr(spider, 'response_failed', None)
        if response_failed and callable(response_failed) and response_failed(response, driver):
            driver.quit()
            return defer.fail(Failure())
        else:
            self.queue.put(driver)  # ?driver????queue
            return defer.succeed(response)  # ??response??
test_acme_util.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_store_unexpected_response(self):
        """
        When the wrapped certificate store returns something other than None,
        an error should be raised as this is unexpected.
        """
        class BrokenCertificateStore(object):
            def store(self, server_name, pem_objects):
                # Return something other than None
                return succeed('foo')

        mlb_store = MlbCertificateStore(BrokenCertificateStore(), self.client)

        d = mlb_store.store('example.com', EXAMPLE_PEM_OBJECTS)

        assert_that(d, failed(WithErrorTypeAndMessage(
            RuntimeError,
            "Wrapped certificate store returned something non-None. Don't "
            "know what to do with 'foo'.")))
test_service.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def mk_marathon_acme(self, sse_kwargs=None, **kwargs):
        marathon_client = MarathonClient(
            ['http://localhost:8080'], client=self.fake_marathon_api.client,
            sse_kwargs=sse_kwargs, reactor=self.clock)
        mlb_client = MarathonLbClient(
            ['http://localhost:9090'], client=self.fake_marathon_lb.client,
            reactor=self.clock)

        return MarathonAcme(
            marathon_client,
            'external',
            self.cert_store,
            mlb_client,
            lambda: succeed(self.txacme_client),
            self.clock,
            **kwargs)
test_bulk_utility.py 文件源码 项目:twistes 作者: avihad 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_bulk_verbose_output(self):
        output = [ITEM_SUCCESS,
                  ITEM_SUCCESS,
                  ITEM_FAILED,
                  ITEM_FAILED,
                  ITEM_FAILED]

        return_value = [succeed(output)]

        self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
        items = yield self.bulk_utility.bulk(None, verbose=True)

        self.assertEqual([ITEM_SUCCESS] * 2,
                         [x for x in items if x == ITEM_SUCCESS])

        self.assertEqual([ITEM_FAILED] * 3,
                         [x for x in items if x == ITEM_FAILED])
default.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getPrivateKey(self):
        file = os.path.expanduser(self.usedFiles[-1])
        if not os.path.exists(file):
            return None
        try:
            return defer.succeed(keys.getPrivateKeyObject(file))
        except keys.BadKeyError, e:
            if e.args[0] == 'encrypted key with no passphrase':
                for i in range(3):
                    prompt = "Enter passphrase for key '%s': " % \
                           self.usedFiles[-1]
                    try:
                        p = self._getPassword(prompt)
                        return defer.succeed(keys.getPrivateKeyObject(file, passphrase = p))
                    except (keys.BadKeyError, ConchError):
                        pass
                return defer.fail(ConchError('bad password'))
            raise
        except KeyboardInterrupt:
            print
            reactor.stop()
default.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getGenericAnswers(self, name, instruction, prompts):
        responses = []
        try:
            oldout, oldin = sys.stdout, sys.stdin
            sys.stdin = sys.stdout = open('/dev/tty','r+')
            if name:
                print name
            if instruction:
                print instruction
            for prompt, echo in prompts:
                if echo:
                    responses.append(raw_input(prompt))
                else:
                    responses.append(getpass.getpass(prompt))
        finally: 
            sys.stdout,sys.stdin=oldout,oldin
        return defer.succeed(responses)
test_webclient.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testPartial(self):
        name = self.mktemp()
        f = open(name, "wb")
        f.write("abcd")
        f.close()

        downloads = []
        partialDownload = [(True, "abcd456789"),
                           (True, "abcd456789"),
                           (False, "0123456789")]

        d = defer.succeed(None)
        for (partial, expectedData) in partialDownload:
            d.addCallback(self._cbRunPartial, name, partial)
            d.addCallback(self._cbPartialTest, expectedData, name)

        return d
test_ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def assertCommandResponse(self, command, expectedResponseLines,
                              chainDeferred=None):
        """Asserts that a sending an FTP command receives the expected
        response.

        Returns a Deferred.  Optionally accepts a deferred to chain its actions
        to.
        """
        if chainDeferred is None:
            chainDeferred = defer.succeed(None)

        def queueCommand(ignored):
            d = self.client.queueStringCommand(command)
            def gotResponse(responseLines):
                self.assertEquals(expectedResponseLines, responseLines)
            return d.addCallback(gotResponse)
        return chainDeferred.addCallback(queueCommand)
test_reflector.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def createReflector(self):
        self.startDB()
        self.dbpool = self.makePool()
        self.dbpool.start()

        if self.can_clear:
            d = self.dbpool.runOperation('DROP TABLE testTable')
            d.addCallback(lambda _:
                          self.dbpool.runOperation('DROP TABLE childTable'))
            d.addErrback(lambda _: None)
        else:
            d = defer.succeed(None)

        d.addCallback(lambda _: self.dbpool.runOperation(main_table_schema))
        d.addCallback(lambda _: self.dbpool.runOperation(child_table_schema))
        reflectorClass = self.escape_slashes and SQLReflector \
                         or NoSlashSQLReflector
        d.addCallback(lambda _:
                      reflectorClass(self.dbpool, [TestRow, ChildRow]))
        return d
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def listRequest(self):
        "Returns a list of 4-tuples: (name, max index, min index, flags)"
        l = self.db['groups']
        r = []
        for i in l:
            if len(self.db[i].keys()):
                low = min(self.db[i].keys())
                high = max(self.db[i].keys()) + 1
            else:
                low = high = 0
            if self.db['moderators'].has_key(i):
                flags = 'm'
            else:
                flags = 'y'
            r.append((i, high, low, flags))
        return defer.succeed(r)
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def articleRequest(self, group, index, id = None):
        if id is not None:
            raise NotImplementedError

        if self.db.has_key(group):
            if self.db[group].has_key(index):
                a = self.db[group][index]
                return defer.succeed((
                    index,
                    a.getHeader('Message-ID'),
                    StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
                ))
            else:
                return defer.fail(ERR_NOARTICLE)
        else:
            return defer.fail(ERR_NOGROUP)
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def articleRequest(self, group, index, id = None):
        if id is not None:
            try:
                xref = self.dbm['Message-IDs'][id]
            except KeyError:
                return defer.fail(NewsServerError("No such article: " + id))
            else:
                group, index = xref[0]
                index = int(index)

        try:
            a = self.dbm['groups'][group].articles[index]
        except KeyError:
            return defer.fail(NewsServerError("No such group: " + group))
        else:
            return defer.succeed((
                index,
                a.getHeader('Message-ID'),
                StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
            ))
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def headRequest(self, group, index, id = None):
        if id is not None:
            try:
                xref = self.dbm['Message-IDs'][id]
            except KeyError:
                return defer.fail(NewsServerError("No such article: " + id))
            else:
                group, index = xref[0]
                index = int(index)

        try:
            a = self.dbm['groups'][group].articles[index]
        except KeyError:
            return defer.fail(NewsServerError("No such group: " + group))
        else:
            return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
database.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bodyRequest(self, group, index, id = None):
        if id is not None:
            try:
                xref = self.dbm['Message-IDs'][id]
            except KeyError:
                return defer.fail(NewsServerError("No such article: " + id))
            else:
                group, index = xref[0]
                index = int(index)

        try:
            a = self.dbm['groups'][group].articles[index]
        except KeyError:
            return defer.fail(NewsServerError("No such group: " + group))
        else:
            return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
pamauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def defConv(items):
    resp = []
    for i in range(len(items)):
        message, kind = items[i]
        if kind == 1: # password
            p = getpass.getpass(message)
            resp.append((p, 0))
        elif kind == 2: # text
            p = raw_input(message)
            resp.append((p, 0))
        elif kind in (3,4):
            print message
            resp.append(("", 0))
        else:
            return defer.fail('foo')
    d = defer.succeed(resp)
    return d
rowjournal.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sync(self):
        """Commit changes to database."""
        if self.syncing:
            raise ValueError, "sync already in progress"
        comandMap = {INSERT : self.reflector.insertRowSQL,
                     UPDATE : self.reflector.updateRowSQL,
                     DELETE : self.reflector.deleteRowSQL}
        sqlCommands = []
        for kind, obj in self.commands:
            sqlCommands.append(comandMap[kind](obj))
        self.commands = []
        if sqlCommands:
            self.syncing = 1
            d = self.reflector.dbpool.runInteraction(self._sync, self.latestIndex, sqlCommands)
            d.addCallback(self._syncDone)
            return d
        else:
            return defer.succeed(1)
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def callRemote(self, name, *args, **kw):
        """Call a specially-designated local method.

        self.callRemote('x') will first try to invoke a method named
        sync_x and return its result (which should probably be a
        Deferred).  Second, it will look for a method called async_x,
        which will be called and then have its result (or Failure)
        automatically wrapped in a Deferred.
        """
        if hasattr(self, 'sync_'+name):
            return getattr(self, 'sync_'+name)(*args, **kw)
        try:
            method = getattr(self, "async_" + name)
            return defer.succeed(method(*args, **kw))
        except:
            f = Failure()
            if self.reportAllTracebacks:
                f.printTraceback()
            return defer.fail(f)
authority.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def lookupZone(self, name, timeout = 10):
        if self.soa[0].lower() == name.lower():
            # Wee hee hee hooo yea
            default_ttl = max(self.soa[1].minimum, self.soa[1].expire)
            if self.soa[1].ttl is not None:
                soa_ttl = self.soa[1].ttl
            else:
                soa_ttl = default_ttl
            results = [dns.RRHeader(self.soa[0], dns.SOA, dns.IN, soa_ttl, self.soa[1], auth=True)]
            for (k, r) in self.records.items():
                for rec in r:
                    if rec.ttl is not None:
                        ttl = rec.ttl
                    else:
                        ttl = default_ttl
                    if rec.TYPE != dns.SOA:
                        results.append(dns.RRHeader(k, rec.TYPE, dns.IN, ttl, rec, auth=True))
            results.append(results[0])
            return defer.succeed((results, (), ()))
        return defer.fail(failure.Failure(dns.DomainError(name)))
test_client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _lookup(self, name, cls, qtype, timeout):
        """
        The getHostByNameTest does a different type of query that requires it
        return an A record from an ALL_RECORDS lookup, so we accomodate that
        here.
        """
        if name == 'getHostByNameTest':
            rr = dns.RRHeader(name=name, type=dns.A, cls=cls, ttl=60,
                    payload=dns.Record_A(address='127.0.0.1', ttl=60))
        else:
            rr = dns.RRHeader(name=name, type=qtype, cls=cls, ttl=60)

        results = [rr]
        authority = []
        addtional = []
        return defer.succeed((results, authority, addtional))
test_names.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_truncatedMessage(self):
        """
        Test that a truncated message results in an equivalent request made via
        TCP.
        """
        m = Message(trunc=True)
        m.addQuery('example.com')

        def queryTCP(queries):
            self.assertEqual(queries, m.queries)
            response = Message()
            response.answers = ['answer']
            response.authority = ['authority']
            response.additional = ['additional']
            return succeed(response)
        self.resolver.queryTCP = queryTCP
        d = self.resolver.filterAnswers(m)
        d.addCallback(
            self.assertEqual, (['answer'], ['authority'], ['additional']))
        return d
cache.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _lookup(self, name, cls, type, timeout):
        now = time.time()
        q = dns.Query(name, type, cls)
        try:
            when, (ans, auth, add) = self.cache[q]
        except KeyError:
            if self.verbose > 1:
                log.msg('Cache miss for ' + repr(name))
            return defer.fail(failure.Failure(dns.DomainError(name)))
        else:
            if self.verbose:
                log.msg('Cache hit for ' + repr(name))
            diff = now - when
            return defer.succeed((
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in ans],
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in auth],
                [dns.RRHeader(str(r.name), r.type, r.cls, r.ttl - diff, r.payload) for r in add]
            ))
test_imap.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testWildcard(self):
        cases = [
            ['foo/%gum/bar',
                ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
                ['foo/xgum/bar', 'foo/gum/bar'],
            ], ['foo/x%x/bar',
                ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
                ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
            ], ['foo/xyz*abc/bar',
                ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
                ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
            ]
        ]

        for (wildcard, fail, succeed) in cases:
            wildcard = imap4.wildcardToRegexp(wildcard, '/')
            for x in fail:
                self.failIf(wildcard.match(x))
            for x in succeed:
                self.failUnless(wildcard.match(x))
test_imap.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testWildcardNoDelim(self):
        cases = [
            ['foo/%gum/bar',
                ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
                ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
            ], ['foo/x%x/bar',
                ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
                ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
            ], ['foo/xyz*abc/bar',
                ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
                ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
            ]
        ]

        for (wildcard, fail, succeed) in cases:
            wildcard = imap4.wildcardToRegexp(wildcard, None)
            for x in fail:
                self.failIf(wildcard.match(x), x)
            for x in succeed:
                self.failUnless(wildcard.match(x), x)
test_imap.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testCreate(self):
        succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
        fail = ('testbox', 'test/box')

        def cb(): self.result.append(1)
        def eb(failure): self.result.append(0)

        def login():
            return self.client.login('testuser', 'password-test')
        def create():
            for name in succeed + fail:
                d = self.client.create(name)
                d.addCallback(strip(cb)).addErrback(eb)
            d.addCallbacks(self._cbStopClient, self._ebGeneral)

        self.result = []
        d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        return d.addCallback(self._cbTestCreate, succeed, fail)
test_mail.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_failureWithSuccessfulFallback(self):
        """
        Test that if the MX record lookup fails, fallback is enabled, and an A
        record is available for the name, then the Deferred returned by
        L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
        gives the address in the A record for the name.
        """
        class DummyResolver(object):
            """
            Fake resolver which will fail an MX lookup but then succeed a
            getHostByName call.
            """
            def lookupMailExchange(self, domain):
                return defer.fail(DNSNameError())

            def getHostByName(self, domain):
                return defer.succeed("1.2.3.4")

        self.mx.resolver = DummyResolver()
        d = self.mx.getMX("domain")
        d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
        return d
ClientRequest.py 文件源码 项目:mitmfnz 作者: dropnz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def resolveHost(self, host):
        address = self.dnsCache.getCachedAddress(host)

        if address != None:
            log.debug("Host cached: {} {}".format(host, address))
            return defer.succeed(address)
        else:

            log.debug("Host not cached.")
            self.customResolver.port = self.urlMonitor.getResolverPort()

            try:
                log.debug("Resolving with DNSChef")
                address = str(self.customResolver.query(host)[0].address)
                return defer.succeed(address)
            except Exception:
                log.debug("Exception occured, falling back to Twisted")
                return reactor.resolve(host)
test_openpgp.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_self_repair_no_keys(self):
        pgp = openpgp.OpenPGPScheme(
            self._soledad, gpgbinary=self.gpg_binary_path)
        yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)

        get_from_index = self._soledad.get_from_index
        delete_doc = self._soledad.delete_doc

        def my_get_from_index(*args):
            if (args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX and
                    args[2] == KEY_FINGERPRINT):
                return succeed([])
            return get_from_index(*args)

        self._soledad.get_from_index = my_get_from_index
        self._soledad.delete_doc = Mock(return_value=succeed(None))

        try:
            yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
                                     KeyNotFound)
            # it should have deleted the index
            self.assertEqual(self._soledad.delete_doc.call_count, 1)
        finally:
            self._soledad.get_from_index = get_from_index
            self._soledad.delete_doc = delete_doc
test_keymanager.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_key_not_found_is_raised_if_key_search_responds_404(self):
        """
        Test if key search request comes back with a 404 response then
        KeyNotFound is raised, with corresponding error message.
        """
        km = self._key_manager(url=NICKSERVER_URI)
        client.readBody = mock.Mock(return_value=defer.succeed(None))
        km._nicknym._async_client_pinned.request = mock.Mock(
            return_value=defer.succeed(None))
        url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS

        d = km._nicknym._fetch_and_handle_404_from_nicknym(url)

        def check_key_not_found_is_raised_if_404(_):
            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
            check_404_callback = used_kwargs['callback']
            fake_response = mock.Mock()
            fake_response.code = NOT_FOUND
            with self.assertRaisesRegexp(errors.KeyNotFound,
                                         '404: Key not found. Request: '
                                         '%s' % url.replace('?', '\?')):
                check_404_callback(fake_response)

        d.addCallback(check_key_not_found_is_raised_if_404)
        return d
test_keymanager.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _fetch_key_with_address(self, km, address, key):
        """
        :returns: a Deferred that will fire with the OpenPGPKey
        """
        data = json.dumps({'address': address, 'openpgp': key})

        client.readBody = mock.Mock(return_value=defer.succeed(data))

        # mock the fetcher so it returns the key for ADDRESS_2
        km._nicknym._async_client_pinned.request = mock.Mock(
            return_value=defer.succeed(None))
        km.ca_cert_path = 'cacertpath'
        # try to key get without fetching from server
        d_fail = km.get_key(address, fetch_remote=False)
        d = self.assertFailure(d_fail, errors.KeyNotFound)
        # try to get key fetching from server.
        d.addCallback(lambda _: km.get_key(address))
        return d


问题


面经


文章

微信
公众号

扫码关注公众号