def validate_mailto(self, parts, verify_exists=False):
"""
Validates a mailto URL, by using Django's EmailValidator.
`verify_exists` does nothing at this time.
:param parts:
:param verify_exists:
:return:
"""
validator = EmailValidator()
try:
validator(parts['path'])
except ValidationError:
return False
else:
return True
python类s()的实例源码
def to_config(config_cls, environ=os.environ):
if config_cls._prefix:
app_prefix = (config_cls._prefix,)
else:
app_prefix = ()
def default_get(environ, metadata, prefix, name):
ce = metadata[CNF_KEY]
if ce.name is not None:
var = ce.name
else:
var = ("_".join(app_prefix + prefix + (name,))).upper()
log.debug("looking for env var '%s'." % (var,))
val = environ.get(var, ce.default)
if val is RAISE:
raise MissingEnvValueError(var)
return val
return _to_config(config_cls, default_get, environ, ())
def _get(self, environ, metadata, prefix, name):
ce = metadata[CNF_KEY]
if ce.name is not None:
var = ce.name
else:
if callable(self.vault_prefix):
vp = self.vault_prefix(environ)
else:
vp = self.vault_prefix
var = "_".join(
((vp,) + prefix + (name,))
).upper()
log.debug("looking for env var '%s'." % (var,))
val = environ.get(var, ce.default)
if val is RAISE:
raise MissingSecretError(var)
return _SecretStr(val)
def _ensure_dir_exists(filename):
# type: (str) -> None
"""Creates a directory tree if it does not already exist.
:param str filename: Full path to file in destination directory
"""
dest_final_dir = filename.rsplit(os.sep, 1)[0]
if dest_final_dir == filename:
# File is in current directory
_LOGGER.debug('Target dir is current dir')
return
try:
os.makedirs(dest_final_dir)
except _file_exists_error():
# os.makedirs(... exist_ok=True) does not work in 2.7
pass
else:
_LOGGER.info('Created directory: %s', dest_final_dir)
def output_filename(source_filename, destination_dir, mode, suffix):
# type: (str, str, str, str) -> str
"""Duplicates the source filename in the destination directory, adding or stripping
a suffix as needed.
:param str source_filename: Full file path to source file
:param str destination_dir: Full file path to destination directory
:param str mode: Operating mode (encrypt/decrypt)
:param str suffix: Suffix to append to output filename
:returns: Full file path of new destination file in destination directory
:rtype: str
"""
if suffix is None:
suffix = OUTPUT_SUFFIX[mode]
else:
_LOGGER.debug('Using custom suffix "%s" to create output file', suffix)
filename = source_filename.rsplit(os.sep, 1)[-1]
_LOGGER.debug('Duplicating filename %s into %s', filename, destination_dir)
return os.path.join(destination_dir, filename) + suffix
def read_tokens(self, db) -> Iterator[DeviceConfig]:
"""Read device information out from a given database file.
:param str db: Database file"""
self.db = db
_LOGGER.info("Reading database from %s" % db)
self.conn = sqlite3.connect(db)
self.conn.row_factory = sqlite3.Row
with self.conn:
is_android = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
is_apple = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
if is_android:
yield from self.read_android()
elif is_apple:
yield from self.read_apple()
else:
_LOGGER.error("Error, unknown database type!")
def do_start(self):
"""
make this transport begin listening on the specified interface and UDP port
interface must be an IP address
"""
# save a TorConfig so we can later use it to send messages
self.torconfig = txtorcon.TorConfig(control=self.tor.protocol)
yield self.torconfig.post_bootstrap
hs_strings = []
if len(self.onion_unix_socket) == 0:
local_socket_endpoint_desc = "tcp:interface=%s:%s" % (self.onion_tcp_interface_ip, self.onion_tcp_port)
else:
local_socket_endpoint_desc = "unix:%s" % self.onion_unix_socket
onion_service_endpoint = endpoints.serverFromString(self.reactor, local_socket_endpoint_desc)
datagram_proxy_factory = OnionDatagramProxyFactory(received_handler=lambda x: self.datagram_received(x))
yield onion_service_endpoint.listen(datagram_proxy_factory)
if len(self.onion_unix_socket) == 0:
hs_strings.append("%s %s:%s" % (self.onion_port, self.onion_tcp_interface_ip, self.onion_tcp_port))
else:
hs_strings.append("%s unix:%s" % (self.onion_port, self.onion_unix_socket))
hs = txtorcon.torconfig.EphemeralHiddenService(hs_strings, key_blob_or_type=self.onion_key)
yield hs.add_to_tor(self.tor.protocol)
def validate(self):
"""
Cross-checks the settings we have against the options the Container has
"""
# Verify all link targets are possible
for alias, target in list(self.links.items()):
if isinstance(target, str):
raise ValueError("Link target {} is still a string!".format(target))
if target.container not in self.container.graph.dependencies(self.container):
warnings.warn("It is not possible to link %s to %s as %s" % (target, self.container, alias))
del self.links[alias]
# Verify devmodes exist
for devmode in list(self.devmodes):
if devmode not in self.container.devmodes:
warnings.warn("Invalid devmode %s on container %s" % (devmode, self.container.name))
self.devmodes.remove(devmode)
def resolve_links(self):
"""
Resolves any links that are still names to instances from the formation
"""
for alias, target in list(self.links.items()):
# If it's a string, it's come from an introspection process where we couldn't
# resolve into an instance at the time (as not all of them were around)
if isinstance(target, str):
try:
target = self.formation[target]
except KeyError:
# We don't error here as that would prevent you stopping orphaned containers;
# instead, we delete the link and warn the user. The deleted link means `up` will recreate it
# if it's orphaned.
del self.links[alias]
warnings.warn("Could not resolve link {} to an instance for {}".format(target, self.name))
else:
self.links[alias] = target
elif isinstance(target, ContainerInstance):
pass
else:
raise ValueError("Invalid link value {}".format(repr(target)))
def convert(self, value, param, ctx):
self.context = ctx
# Exact match
if value in self.choices:
return value
# Match through normalization
if ctx is not None and \
ctx.token_normalize_func is not None:
value = ctx.token_normalize_func(value)
for choice in self.choices:
if ctx.token_normalize_func(choice) == value:
return choice
self.fail('invalid choice: %s. %s' % (PURPLE(value), self.get_missing_message(param, value)), param, ctx)
def image_version(self, image_name, image_tag):
"""
Returns the Docker image hash of the requested image and tag, or
raises ImageNotFoundException if it's not available on the host.
"""
if image_tag == "local":
image_tag = "latest"
try:
docker_info = self.host.client.inspect_image("{}:{}".format(image_name, image_tag))
return docker_info['Id']
except NotFound:
# TODO: Maybe auto-build if we can?
raise ImageNotFoundException(
"Cannot find image {}:{}".format(image_name, image_tag),
image=image_name,
image_tag=image_tag,
)
def attach(app, container, host, command):
"""
Attaches to a container
"""
if command:
shell = ['/bin/bash', '-lc', ' '.join(command)]
else:
shell = ['/bin/bash']
# See if the container is running
formation = FormationIntrospector(host, app.containers).introspect()
for instance in formation:
if instance.container == container:
# Work out anything to put before the shell (e.g. ENV)
pre_args = []
if os.environ.get("TERM", None):
pre_args = ["env", "TERM=%s" % os.environ['TERM']]
# Launch into an attached shell
status_code = subprocess.call(["docker", "exec", "-it", instance.name] + pre_args + shell)
sys.exit(status_code)
# It's not running ;(
click.echo(RED("Container {name} is not running. It must be started to attach - try `bay run {name}`.".format(
name=container.name,
)))
def destroy(app, host, name):
"""
Destroys a single volume
"""
task = Task("Destroying volume {}".format(name))
# Run GC first to clean up stopped containers
from .gc import GarbageCollector
GarbageCollector(host).gc_all(task)
# Remove the volume
formation = FormationIntrospector(host, app.containers).introspect()
instance_conflicts = [instance.container.name for instance in formation.get_instances_using_volume(name)]
if instance_conflicts:
task.finish(status="Volume {} is in use by container(s): {}".format(
name, ",".join(instance_conflicts)), status_flavor=Task.FLAVOR_BAD)
else:
try:
host.client.remove_volume(name)
except NotFound:
task.add_extra_info("There is no volume called {}".format(name))
task.finish(status="Not found", status_flavor=Task.FLAVOR_BAD)
else:
task.finish(status="Done", status_flavor=Task.FLAVOR_GOOD)
def setenv(self, key, val):
"""
Set internal environment variable.
Changes internal environment in which subprocesses will be run.
Does not change the process's own environment.
:param key: name of variable
:param value: value of variable
"""
if val is None:
if key in self._env:
del self._env[key]
return
key = str(key) # keys must be strings
val = str(val) # vals must be strings
self._env[key] = val
def fetch_suite(self, suite):
flat = suite.endswith('/')
if flat:
baseurl = joinurl(self.uri, suite)
else:
baseurl = joinurl(self.uri, 'dists', suite)
log.info('Fetching InRelease file from %s', baseurl)
r = self.session.get(joinurl(baseurl, 'InRelease'))
if not (400 <= r.status_code < 500):
r.raise_for_status()
release = ReleaseFile.parse_signed(r.content)
else:
log.info('Server returned %d; fetching Release file instead',
r.status_code)
r = self.session.get(joinurl(baseurl, 'Release'))
r.raise_for_status()
release = ReleaseFile.parse(r.content)
### TODO: Handle/fetch/verify PGP stuff
if flat:
return FlatRepository(self, suite, release)
else:
return Suite(self, suite, release)
def _iterClass(self, cls, prefix=''):
"""
Descend a Klein()'s url_map, and generate ConvertedRule() for each one
"""
iterableRules = [(prefix, cls, cls.app.url_map.iter_rules())]
for prefix, currentClass, i in iter(iterableRules):
for rule in i:
converted = dumpRule(currentClass, rule, prefix)
if converted.branch:
continue
if converted.subKlein:
clsDown = namedAny(converted.subKlein)
iterableRules.append((converted.rulePath, clsDown, clsDown.app.url_map.iter_rules()))
yield converted
def statistics(self):
"""Return an object containing debugging information.
Currently the following fields are defined:
* ``borrowed_tokens``: The number of tokens currently borrowed from
the sack.
* ``total_tokens``: The total number of tokens in the sack. Usually
this will be larger than ``borrowed_tokens``, but it's possibly for
it to be smaller if :attr:`total_tokens` was recently decreased.
* ``borrowers``: A list of all tasks or other entities that currently
hold a token.
* ``tasks_waiting``: The number of tasks blocked on this
:class:`CapacityLimiter`\'s :meth:`acquire` or
:meth:`acquire_on_behalf_of` methods.
"""
return _CapacityLimiterStatistics(
borrowed_tokens=len(self._borrowers),
total_tokens=self._total_tokens,
# Use a list instead of a frozenset just in case we start to allow
# one borrower to hold multiple tokens in the future
borrowers=list(self._borrowers),
tasks_waiting=len(self._lot),
)
def statistics(self):
"""Return an object containing debugging information.
Currently the following fields are defined:
* ``locked``: boolean indicating whether the lock is held.
* ``owner``: the :class:`trio.hazmat.Task` currently holding the lock,
or None if the lock is not held.
* ``tasks_waiting``: The number of tasks blocked on this lock's
:meth:`acquire` method.
"""
return _LockStatistics(
locked=self.locked(),
owner=self._owner,
tasks_waiting=len(self._lot),
)
def statistics(self):
"""Returns an object containing debugging information.
Currently the following fields are defined:
* ``qsize``: The number of items currently in the queue.
* ``capacity``: The maximum number of items the queue can hold.
* ``tasks_waiting_put``: The number of tasks blocked on this queue's
:meth:`put` method.
* ``tasks_waiting_get``: The number of tasks blocked on this queue's
:meth:`get` method.
"""
return _QueueStats(
qsize=len(self._data),
capacity=self.capacity,
tasks_waiting_put=self._put_semaphore.statistics().tasks_waiting,
tasks_waiting_get=self._get_semaphore.statistics().tasks_waiting,
tasks_waiting_join=self._join_lot.statistics().tasks_waiting
)
def current_default_worker_thread_limiter():
"""Get the default :class:`CapacityLimiter` used by
:func:`run_sync_in_worker_thread`.
The most common reason to call this would be if you want to modify its
:attr:`~CapacityLimiter.total_tokens` attribute.
"""
try:
limiter = _limiter_local.limiter
except AttributeError:
limiter = _limiter_local.limiter = CapacityLimiter(DEFAULT_LIMIT)
return limiter
# Eventually we might build this into a full-fledged deadlock-detection
# system; see https://github.com/python-trio/trio/issues/182
# But for now we just need an object to stand in for the thread, so we can
# keep track of who's holding the CapacityLimiter's token.
def test_main_and_task_both_crash(recwarn):
# If main crashes and there's also a task crash, then we get both in a
# MultiError
async def crasher():
raise ValueError
async def main(wait):
async with _core.open_nursery() as nursery:
crasher_task = nursery.spawn(crasher)
if wait:
await crasher_task.wait()
raise KeyError
for wait in [True, False]:
with pytest.raises(_core.MultiError) as excinfo:
_core.run(main, wait)
print(excinfo.value)
assert set(type(exc) for exc in excinfo.value.exceptions) == {
ValueError, KeyError
}
def test_broken_abort():
async def main():
# These yields are here to work around an annoying warning -- we're
# going to crash the main loop, and if we (by chance) do this before
# the run_sync_soon task runs for the first time, then Python gives us
# a spurious warning about it not being awaited. (I mean, the warning
# is correct, but here we're testing our ability to deliver a
# semi-meaningful error after things have gone totally pear-shaped, so
# it's not relevant.) By letting the run_sync_soon_task run first, we
# avoid the warning.
await _core.checkpoint()
await _core.checkpoint()
with _core.open_cancel_scope() as scope:
scope.cancel()
# None is not a legal return value here
await _core.wait_task_rescheduled(lambda _: None)
with pytest.raises(_core.TrioInternalError):
_core.run(main)
# Because this crashes, various __del__ methods print complaints on
# stderr. Make sure that they get run now, so the output is attached to
# this test.
gc_collect_harder()
def test_system_task_crash_KeyboardInterrupt():
async def ki():
raise KeyboardInterrupt
async def main():
_core.spawn_system_task(ki)
await sleep_forever()
# KI doesn't get wrapped with TrioInternalError
with pytest.raises(KeyboardInterrupt):
_core.run(main)
# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
# the next time that it's blocked.
def test_exc_info_after_yield_error():
child_task = None
async def child():
nonlocal child_task
child_task = _core.current_task()
try:
raise KeyError
except Exception:
try:
await sleep_forever()
except Exception:
pass
raise
with pytest.raises(KeyError):
async with _core.open_nursery() as nursery:
nursery.start_soon(child)
await wait_all_tasks_blocked()
_core.reschedule(child_task, _core.Error(ValueError()))
# Similar to previous test -- if the ValueError() gets sent in via 'throw',
# then Python's normal implicit chaining stuff is broken.
def test_TrioToken_run_sync_soon_massive_queue():
# There are edge cases in the wakeup fd code when the wakeup fd overflows,
# so let's try to make that happen. This is also just a good stress test
# in general. (With the current-as-of-2017-02-14 code using a socketpair
# with minimal buffer, Linux takes 6 wakeups to fill the buffer and MacOS
# takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes
# ~600,000 wakeups, but has the same code paths...)
COUNT = 1000
token = _core.current_trio_token()
counter = [0]
def cb(i):
# This also tests FIFO ordering of callbacks
assert counter[0] == i
counter[0] += 1
for i in range(COUNT):
token.run_sync_soon(cb, i)
await wait_all_tasks_blocked()
assert counter[0] == COUNT
def __exit__(self, etype, exc, tb):
if exc is not None:
filtered_exc = MultiError.filter(self._handler, exc)
if filtered_exc is exc:
# Let the interpreter re-raise it
return False
if filtered_exc is None:
# Swallow the exception
return True
# When we raise filtered_exc, Python will unconditionally blow
# away its __context__ attribute and replace it with the original
# exc we caught. So after we raise it, we have to pause it while
# it's in flight to put the correct __context__ back.
old_context = filtered_exc.__context__
try:
raise filtered_exc
finally:
_, value, _ = sys.exc_info()
assert value is filtered_exc
value.__context__ = old_context
def flags(self):
flags = 0
if self.read_task is not None:
flags |= select.EPOLLIN
if self.write_task is not None:
flags |= select.EPOLLOUT
if not flags:
return None
# XX not sure if EPOLLEXCLUSIVE is actually safe... I think
# probably we should use it here unconditionally, but:
# https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering
#flags |= select.EPOLLEXCLUSIVE
# We used to use ONESHOT here also, but it turns out that it's
# confusing/complicated: you can't use ONESHOT+EPOLLEXCLUSIVE
# together, you ONESHOT doesn't delete the registration but just
# "disables" it so you re-enable with CTL rather than ADD (or
# something?)...
# https://lkml.org/lkml/2016/2/4/541
return flags
def test_do():
@attr.s
class EDo:
arg = attr.ib()
@do
def do_func(a, b):
done_a = yield Effect(EDo(a))
done_b = yield Effect(EDo(b))
return [done_a, done_b]
effect = do_func(1, 2)
assert isinstance(effect, Effect)
assert isinstance(effect.intent, ChainedIntent)
dispatcher = TypeDispatcher({
EDo: lambda intent: 'done: %s' % intent.arg
})
ret = sync_perform(dispatcher, effect)
assert ret == ['done: 1', 'done: 2']
def test_chained_intent(self):
@attr.s
class ENumToString:
num = attr.ib()
def collect_intent_results():
intent_results = []
for i in range(5):
res = yield Effect(ENumToString(i))
intent_results.append(res)
return ''.join(intent_results)
effect = Effect(ChainedIntent(collect_intent_results()))
dispatcher = TypeDispatcher({
ENumToString: lambda intent: str(intent.num)
})
ret = await asyncio_perform(dispatcher, effect)
assert ret == '01234'
def block_connection_factory(url):
if url.startswith('s3:'):
try:
from parsec.core.block_s3 import S3BlockConnection
_, region, bucket, key_id, key_secret = url.split(':')
except ImportError as exc:
raise SystemExit('Parsec needs boto3 to support S3 block storage (error: %s).' %
exc)
except ValueError:
raise SystemExit('Invalid s3 block store '
' (should be `s3:<region>:<bucket>:<id>:<secret>`.')
return S3BlockConnection(region, bucket, key_id, key_secret)
elif url.startswith('http://'):
return RESTBlockConnection(url)
else:
raise SystemExit('Unknown block store `%s`.' % url)