def testExtractOpenPGPHeader(self):
"""
Test the OpenPGP header key extraction
"""
KEYURL = "https://leap.se/key.txt"
OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
message = Parser().parsestr(self.EMAIL)
message.add_header("OpenPGP", OpenPGP)
self.fetcher._keymanager.fetch_key = Mock(
return_value=defer.succeed(None))
def fetch_key_called(ret):
self.fetcher._keymanager.fetch_key.assert_called_once_with(
ADDRESS_2, KEYURL)
d = self._create_incoming_email(message.as_string())
d.addCallback(
lambda email:
self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
d.addCallback(lambda _: self.fetcher.fetch())
d.addCallback(fetch_key_called)
return d
python类succeed()的实例源码
def testExtractAttachedKey(self):
KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
message = MIMEMultipart()
message.add_header("from", ADDRESS_2)
key = MIMEApplication("", "pgp-keys")
key.set_payload(KEY)
message.attach(key)
self.fetcher._keymanager.put_raw_key = Mock(
return_value=defer.succeed(None))
def put_raw_key_called(_):
self.fetcher._keymanager.put_raw_key.assert_called_once_with(
KEY, address=ADDRESS_2)
d = self._do_fetch(message.as_string())
d.addCallback(put_raw_key_called)
return d
def testExtractAttachedKeyAndNotOpenPGPHeader(self):
KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
KEYURL = "https://leap.se/key.txt"
OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
message = MIMEMultipart()
message.add_header("from", ADDRESS_2)
message.add_header("OpenPGP", OpenPGP)
key = MIMEApplication("", "pgp-keys")
key.set_payload(KEY)
message.attach(key)
self.fetcher._keymanager.put_raw_key = Mock(
return_value=defer.succeed(None))
self.fetcher._keymanager.fetch_key = Mock()
def put_raw_key_called(_):
self.fetcher._keymanager.put_raw_key.assert_called_once_with(
KEY, address=ADDRESS_2)
self.assertFalse(self.fetcher._keymanager.fetch_key.called)
d = self._do_fetch(message.as_string())
d.addCallback(put_raw_key_called)
return d
def getReply(self, line, proto, transport):
proto.lineReceived(line)
if line[:4] not in ['HELO', 'MAIL', 'RCPT', 'DATA']:
return succeed("")
def check_transport(_):
reply = transport.value()
if reply:
transport.clear()
return succeed(reply)
d = Deferred()
d.addCallback(check_transport)
reactor.callLater(0, lambda: d.callback(None))
return d
return check_transport(None)
def maybe_download_ca_cert(self, ignored, replace=False):
"""
:rtype: deferred
"""
# TODO: doesn't update the cert :((((
enc_domain = self._domain.encode(sys.getfilesystemencoding())
path = os.path.join(self._basedir, 'providers', enc_domain, 'keys',
'ca', 'cacert.pem')
if not replace and is_file(path):
return defer.succeed('ca_cert_path_already_exists')
def errback(failure):
raise NetworkError(failure.getErrorMessage())
uri = self._get_ca_cert_uri()
mkdir_p(os.path.split(path)[0])
# We don't validate the TLS cert for this connection,
# just check the fingerprint of the ca.cert
d = downloadPage(uri, path)
d.addCallback(self._reload_http_client)
d.addErrback(errback)
return d
def _get_collection_by_mailbox(self, name):
collection = self._collection_mapping[self.user_id].get(
name, None)
if collection:
return defer.succeed(collection)
# imap select will use this, passing the collection to SoledadMailbox
def get_collection_for_mailbox(mbox_wrapper):
collection = MessageCollection(
self.adaptor, self.store, self.mbox_indexer, mbox_wrapper)
self._collection_mapping[self.user_id][name] = collection
return collection
d = self.adaptor.get_or_create_mbox(self.store, name)
d.addCallback(get_collection_for_mailbox)
return d
def setUp(self):
super(CloudTest, self).setUp()
self.query_results = {}
self.kwargs = {}
def fetch_stub(url, **kwargs):
self.kwargs = kwargs
value = self.query_results[url]
if isinstance(value, Exception):
return fail(value)
else:
return succeed(value)
self.fetch_func = fetch_stub
self.add_query_result("instance-id", b"i00001")
self.add_query_result("ami-id", b"ami-00002")
self.add_query_result("instance-type", b"hs1.8xlarge")
def render_POST(self, request):
#
# Turn a path or show command into a set of candidate protobuf definitions.
# If it looks like a show command, then schema-describe it, get the set of
# paths, and run the GPB generation on each one. Otherwise, just run that on
# the sole provided path.
#
path = request.args['path'][0].strip()
if path.startswith('sh'):
d = scrape.schema_describe(path, request.sdata)
else:
d = defer.succeed([path])
def request_protobufs(paths):
print('### PROTOBUF PATHS = {}'.format(paths))
ds = []
for path in reversed(paths):
path = re.sub('\(.*?\)', '', path)
ds.append(request.sdata.api.cli_exec(
'run telemetry_generate_gpb "{}"'.format(path)))
return defer.DeferredList(ds)
d.addCallback(request_protobufs)
def get_protobufs(replies):
line = '-' * 77
sep = '\n//\n// ' + line + '\n//\n\n'
text = sep.join([reply[1]['result'] for reply in replies])
request.sdata.set_text('#protobuf_result', text)
request.sdata.add_to_push_queue('stop_current_spinner')
request.sdata.highlight('#protobuf_result')
d.addCallback(get_protobufs)
request.setHeader('Content-Type', 'application/json')
return '{}'
def verifyHostKey(self, ui, hostname, ip, key):
return defer.succeed(True)
def request(self, *args, **kwargs):
return succeed((args, kwargs))
def ack(self, container):
ack_message = unicode(container.message.create_ack(
application="ELCID", facility="UCLH"
))
self.log.info(ack_message.replace("\r", "\n"))
return defer.succeed(ack_message)
def test_bulk_stats_only(self):
return_value = [succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
success, faileds = yield self.bulk_utility.bulk(None, stats_only=True)
self.assertEqual(2, success)
self.assertEqual(1, faileds)
def test_bulk_not_stats_only(self):
return_value = [succeed([ITEM_SUCCESS,
ITEM_SUCCESS,
ITEM_FAILED,
ITEM_FAILED,
ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
success, errors = yield self.bulk_utility.bulk(None, stats_only=False)
self.assertEqual(success, 2)
self.assertEqual([ERROR_MSG] * 3, errors)
def next(self):
"""Fetch next page from scroll API."""
d = None
if self._first_results:
d = succeed(EsUtils.extract_hits(self._first_results))
self._first_results = None
elif self._scroll_id:
d = self._scroll_next_results()
else:
raise StopIteration()
return d
def requestAvatar(self, avatarId, mind, *interfaces):
return defer.succeed((interfaces[0], None, lambda: None))
def resolveHost(self, host):
address = self.dnsCache.getCachedAddress(host)
if address != None:
logging.debug("Host cached.")
return defer.succeed(address)
else:
logging.debug("Host not cached.")
return reactor.resolve(host)
def resolveHost(self, host):
address = self.dnsCache.getCachedAddress(host)
if address != None:
logging.debug("Host cached.")
return defer.succeed(address)
else:
logging.debug("Host not cached.")
return reactor.resolve(host)
def resolveHost(self, host):
address = self.dnsCache.getCachedAddress(host)
if address != None:
logging.debug("Host cached.")
return defer.succeed(address)
else:
logging.debug("Host not cached.")
return reactor.resolve(host)
def sendMessage(self, text, meta=None):
if self.account.client is None:
raise locals.OfflineError
if meta:
if meta.get("style", None) == "emote":
text="* "+text+"* "
self.account.client.say(self.name,html(text))
return succeed(text)
def sendGroupMessage(self, text, meta=None):
if self.account.client is None:
raise locals.OfflineError
if meta:
if meta.get("style", None) == "emote":
text="* "+text+"* "
self.account.client.chat_say(self.roomID,html(text))
return succeed(text)