python类gatherResults()的实例源码

test_imap.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testRename(self):
        """
        Test whether we can rename a mailbox
        """
        def create_mbox():
            return self.server.theAccount.addMailbox('oldmbox')

        def login():
            return self.client.login(TEST_USER, TEST_PASSWD)

        def rename():
            return self.client.rename('oldmbox', 'newname')

        d1 = self.connected.addCallback(strip(create_mbox))
        d1.addCallback(strip(login))
        d1.addCallbacks(strip(rename), self._ebGeneral)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _:
                      self.server.theAccount.account.list_all_mailbox_names())
        d.addCallback(lambda mboxes:
                      self.assertItemsEqual(
                          mboxes, DEFAULT_MBOXES + ['newname']))
        return d
test_imap.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testHierarchicalRename(self):
        """
        Try to rename hierarchical mailboxes
        """
        acc = self.server.theAccount

        def add_mailboxes():
            return defer.gatherResults([
                acc.addMailbox('oldmbox/m1'),
                acc.addMailbox('oldmbox/m2')])

        def login():
            return self.client.login(TEST_USER, TEST_PASSWD)

        def rename():
            return self.client.rename('oldmbox', 'newname')

        d1 = self.connected.addCallback(strip(add_mailboxes))
        d1.addCallback(strip(login))
        d1.addCallbacks(strip(rename), self._ebGeneral)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: acc.account.list_all_mailbox_names())
        return d.addCallback(self._cbTestHierarchicalRename)
test_imap.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testCapability(self):
        caps = {}

        def getCaps():
            def gotCaps(c):
                caps.update(c)
                self.server.transport.loseConnection()
            return self.client.getCapabilities().addCallback(gotCaps)

        d1 = self.connected
        d1.addCallback(
            strip(getCaps)).addErrback(self._ebGeneral)

        d = defer.gatherResults([self.loopback(), d1])
        expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'LITERAL+': None,
                    'IDLE': None}
        d.addCallback(lambda _: self.assertEqual(expected, caps))
        return d
test_imap.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testNamespace(self):
        """
        Test retrieving namespace
        """
        self.namespaceArgs = None

        def login():
            return self.client.login(TEST_USER, TEST_PASSWD)

        def namespace():
            def gotNamespace(args):
                self.namespaceArgs = args
                self._cbStopClient(None)
            return self.client.namespace().addCallback(gotNamespace)

        d1 = self.connected.addCallback(strip(login))
        d1.addCallback(strip(namespace))
        d1.addErrback(self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: self.assertEqual(self.namespaceArgs,
                                                 [[['', '/']], [], []]))
        return d
test_imap.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testFailedStatus(self):
        """
        Test failed status command with a non-existent mailbox
        """
        def login():
            return self.client.login(TEST_USER, TEST_PASSWD)

        def status():
            return self.client.status(
                'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')

        def statused(result):
            self.statused = result

        def failed(failure):
            self.failure = failure

        self.statused = self.failure = None
        d1 = self.connected.addCallback(strip(login))
        d1.addCallbacks(strip(status), self._ebGeneral)
        d1.addCallbacks(statused, failed)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        return defer.gatherResults([d1, d2]).addCallback(
            self._cbTestFailedStatus)
config.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_config_for_all_services(self, session):
        services_dict = self._load_provider_configs()
        configs_path = self._get_configs_path()
        with open(configs_path) as jsonf:
            services_dict = Record(**json.load(jsonf)).services
        pending = []
        base = self._disco.get_base_uri()
        for service in self._provider_config.services:
            if service in self.SERVICES_MAP.keys():
                for subservice in self.SERVICES_MAP[service]:
                    uri = base + str(services_dict[subservice])
                    path = self._get_service_config_path(subservice)
                    if session:
                        d = session.fetch_provider_configs(
                            uri, path, method='GET')
                    else:
                        d = self._fetch_provider_configs_unauthenticated(
                            uri, path, method='GET')
                    pending.append(d)
        return defer.gatherResults(pending)
openpgp.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _repair_docs(self, doclist, cmp_func, log_func):
        self.log.error("BUG -------------------------------------------------")
        self.log.error("There is more than one doc of type %s:"
                       % (doclist[0].content[KEY_TYPE_KEY],))

        doclist.sort(cmp=cmp_func, reverse=True)
        log_func(doclist[0])
        deferreds = []
        for doc in doclist[1:]:
            log_func(doc)
            d = self._soledad.delete_doc(doc)
            deferreds.append(d)

        self.log.error('Error repairing')
        self.log.error("BUG (please report above info) ----------------------")
        d = defer.gatherResults(deferreds, consumeErrors=True)
        d.addCallback(lambda _: doclist[0])
        return d
migrator.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _buildKeyDict(self, keys, actives):
        keydict = {
            fp2id(key.content[doc.KEY_FINGERPRINT_KEY]): KeyDocs(key, [])
            for key in keys}

        deferreds = []
        for active in actives:
            if KEY_ID_KEY in active.content:
                key_id = active.content[KEY_ID_KEY]
                if key_id not in keydict:
                    d = self._soledad.delete_doc(active)
                    deferreds.append(d)
                    continue
                keydict[key_id].active.append(active)

        d = gatherResults(deferreds)
        d.addCallback(lambda _: keydict)
        return d
__init__.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_all_keys(self, km):
        def delete_keys(keys):
            deferreds = []
            for key in keys:
                d = km._openpgp.delete_key(key)
                deferreds.append(d)
            return gatherResults(deferreds)

        def check_deleted(_, private):
            d = km.get_all_keys(private=private)
            d.addCallback(lambda keys: self.assertEqual(keys, []))
            return d

        deferreds = []
        for private in [True, False]:
            d = km.get_all_keys(private=private)
            d.addCallback(delete_keys)
            d.addCallback(check_deleted, private)
            deferreds.append(d)
        return gatherResults(deferreds)
server.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _cbSelectWork(self, mbox, cmdName, tag):
        """
        Callback for selectWork

        * patched to avoid conformance errors due to incomplete UIDVALIDITY
        line.
        * patched to accept deferreds for messagecount and recent count
        """
        if mbox is None:
            self.sendNegativeResponse(tag, 'No such mailbox')
            return
        if '\\noselect' in [s.lower() for s in mbox.getFlags()]:
            self.sendNegativeResponse(tag, 'Mailbox cannot be selected')
            return

        d1 = defer.maybeDeferred(mbox.getMessageCount)
        d2 = defer.maybeDeferred(mbox.getRecentCount)
        d3 = defer.maybeDeferred(mbox.getUIDNext)
        return defer.gatherResults([d1, d2, d3]).addCallback(
            self.__cbSelectWork, mbox, cmdName, tag)
mail.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_all_flagged(self):
        """
        Delete all messages flagged as \\Deleted.
        Used from IMAPMailbox.expunge()
        """
        def get_uid_list(hashes):
            d = []
            for h in hashes:
                d.append(self.mbox_indexer.get_uid_from_doc_id(
                         self.mbox_uuid, h))
            return defer.gatherResults(d), hashes

        def delete_uid_entries((uids, hashes)):
            d = []
            for h in hashes:
                d.append(self.mbox_indexer.delete_doc_by_hash(
                         self.mbox_uuid, h))

            def return_uids_when_deleted(ignored):
                return uids

            all_deleted = defer.gatherResults(d).addCallback(
                return_uids_when_deleted)
            return all_deleted
__init__.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_all_keys(self, km):
        def delete_keys(keys):
            deferreds = []
            for key in keys:
                d = km._openpgp.delete_key(key)
                deferreds.append(d)
            return gatherResults(deferreds)

        def check_deleted(_, private):
            d = km.get_all_keys(private=private)
            d.addCallback(lambda keys: self.assertEqual(keys, []))
            return d

        deferreds = []
        for private in [True, False]:
            d = km.get_all_keys(private=private)
            d.addCallback(delete_keys)
            d.addCallback(check_deleted, private)
            deferreds.append(d)
        return gatherResults(deferreds)
test_tcp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 190 收藏 0 点赞 0 评论 0
def testWriter(self):
        f = protocol.Factory()
        f.protocol = WriterProtocol
        f.done = 0
        f.problem = 0
        wrappedF = WiredFactory(f)
        p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
        n = p.getHost().port
        self.ports.append(p)
        clientF = WriterClientFactory()
        wrappedClientF = WiredFactory(clientF)
        reactor.connectTCP("127.0.0.1", n, wrappedClientF)

        def check(ignored):
            self.failUnless(f.done, "writer didn't finish, it probably died")
            self.failUnless(f.problem == 0, "writer indicated an error")
            self.failUnless(clientF.done,
                            "client didn't see connection dropped")
            expected = "".join(["Hello Cleveland!\n",
                                "Goodbye", " cruel", " world", "\n"])
            self.failUnless(clientF.data == expected,
                            "client didn't receive all the data it expected")
        d = defer.gatherResults([wrappedF.onDisconnect,
                                 wrappedClientF.onDisconnect])
        return d.addCallback(check)
test_adbapi.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _testPool_3(self, res):
        sql = "select count(1) from simple"
        inserts = []
        # add some rows to simple table (runOperation)
        for i in range(self.num_iterations):
            sql = "insert into simple(x) values(%d)" % i
            inserts.append(self.dbpool.runOperation(sql))
        d = defer.gatherResults(inserts)

        def _select(res):
            # make sure they were added (runQuery)
            sql = "select x from simple order by x";
            d = self.dbpool.runQuery(sql)
            return d
        d.addCallback(_select)

        def _check(rows):
            self.failUnless(len(rows) == self.num_iterations,
                            "Wrong number of rows")
            for i in range(self.num_iterations):
                self.failUnless(len(rows[i]) == 1, "Wrong size row")
                self.failUnless(rows[i][0] == i, "Values not returned.")
        d.addCallback(_check)

        return d
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testPIDFile(self):
        filename = self.mktemp()
        f = Factory(self, filename)
        l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
        self.failUnless(lockfile.isLocked(filename + ".lock"))
        tcf = TestClientFactory(self, filename)
        c = reactor.connectUNIX(filename, tcf, checkPID=1)
        d = defer.gatherResults([f.deferred, tcf.deferred])
        def _portStuff(ignored):
            self._addPorts(l, c.transport, tcf.protocol.transport,
                           f.protocol.transport)
            return self.cleanPorts(*self.ports)
        def _check(ignored):
            self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
        d.addCallback(_portStuff)
        d.addCallback(_check)
        return d
test_udp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_interface(self):
        """
        Test C{getOutgoingInterface} and C{setOutgoingInterface}.
        """
        self.assertEqual(
            self.client.transport.getOutgoingInterface(), "0.0.0.0")
        self.assertEqual(
            self.server.transport.getOutgoingInterface(), "0.0.0.0")

        d1 = self.client.transport.setOutgoingInterface("127.0.0.1")
        d2 = self.server.transport.setOutgoingInterface("127.0.0.1")
        result = gatherResults([d1, d2])

        def cbInterfaces(ignored):
            self.assertEqual(
                self.client.transport.getOutgoingInterface(), "127.0.0.1")
            self.assertEqual(
                self.server.transport.getOutgoingInterface(), "127.0.0.1")
        result.addCallback(cbInterfaces)
        return result
test_imap.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testNamespace(self):
        self.namespaceArgs = None
        def login():
            return self.client.login('testuser', 'password-test')
        def namespace():
            def gotNamespace(args):
                self.namespaceArgs = args
                self._cbStopClient(None)
            return self.client.namespace().addCallback(gotNamespace)

        d1 = self.connected.addCallback(strip(login))
        d1.addCallback(strip(namespace))
        d1.addErrback(self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
                                                  [[['', '/']], [], []]))
        return d
test_imap.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testExamine(self):
        SimpleServer.theAccount.addMailbox('test-mailbox')
        self.examinedArgs = None
        def login():
            return self.client.login('testuser', 'password-test')
        def examine():
            def examined(args):
                self.examinedArgs = args
                self._cbStopClient(None)
            d = self.client.examine('test-mailbox')
            d.addCallback(examined)
            return d

        d1 = self.connected.addCallback(strip(login))
        d1.addCallback(strip(examine))
        d1.addErrback(self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        return d.addCallback(self._cbTestExamine)
test_imap.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testCreate(self):
        succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
        fail = ('testbox', 'test/box')

        def cb(): self.result.append(1)
        def eb(failure): self.result.append(0)

        def login():
            return self.client.login('testuser', 'password-test')
        def create():
            for name in succeed + fail:
                d = self.client.create(name)
                d.addCallback(strip(cb)).addErrback(eb)
            d.addCallbacks(self._cbStopClient, self._ebGeneral)

        self.result = []
        d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        return d.addCallback(self._cbTestCreate, succeed, fail)
test_imap.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testIllegalInboxDelete(self):
        self.stashed = None
        def login():
            return self.client.login('testuser', 'password-test')
        def delete():
            return self.client.delete('inbox')
        def stash(result):
            self.stashed = result

        d1 = self.connected.addCallback(strip(login))
        d1.addCallbacks(strip(delete), self._ebGeneral)
        d1.addBoth(stash)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
                                                           failure.Failure)))
        return d


问题


面经


文章

微信
公众号

扫码关注公众号