def ctl_each(obj, command, arguments):
'''Invoke any kubectl command directly for each pod matched and collate the output.'''
width, height = click.get_terminal_size()
kubectl = obj.kubey.kubectl
collector = tabular.RowCollector()
ns_pods = defaultdict(list)
for pod in obj.kubey.each_pod(obj.maximum):
ns_pods[pod.namespace].append(pod)
for ns, pods in ns_pods.items():
args = ['-n', ns] + list(arguments) + [p.name for p in pods]
kubectl.call_table_rows(collector.handler_for(ns), command, *args)
kubectl.wait()
if collector.rows:
click.echo(tabular.tabulate(obj, sorted(collector.rows), collector.headers))
if kubectl.final_rc != 0:
click.get_current_context().exit(kubectl.final_rc)
python类get_current_context()的实例源码
def whoami_command():
"""
Executor for `globus whoami`
"""
username = lookup_option(WHOAMI_USERNAME_OPTNAME)
if not username:
safeprint('No login information available. Please try '
'logging in again.', write_to_stderr=True)
click.get_current_context().exit(1)
fields = tuple((x, x) for x in ('Username', 'Name', 'ID', 'Email'))
formatted_print({
'Username': username,
'Name': lookup_option(WHOAMI_NAME_OPTNAME),
'ID': lookup_option(WHOAMI_ID_OPTNAME),
'Email': lookup_option(WHOAMI_EMAIL_OPTNAME)
}, fields=fields,
text_format=(FORMAT_TEXT_RECORD if is_verbose() else FORMAT_TEXT_RAW),
response_key=None if is_verbose() else 'Username')
def shell_complete_option(f):
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
return
if value == 'BASH':
do_bash_complete()
elif value == 'ZSH':
do_zsh_complete()
else:
raise ValueError('Unsupported shell completion')
click.get_current_context().exit(0)
f = click.option('--shell-complete', cls=HiddenOption,
is_eager=True, expose_value=False,
type=click.Choice(SUPPORTED_SHELLS),
callback=callback)(f)
return f
def autoactivate(client, endpoint_id, if_expires_in=None):
"""
Attempts to auto-activate the given endpoint with the given client
If auto-activation fails, parses the returned activation requirements
to determine which methods of activation are supported, then tells
the user to use 'globus endpoint activate' with the correct options(s)
"""
kwargs = {}
if if_expires_in is not None:
kwargs['if_expires_in'] = if_expires_in
res = client.endpoint_autoactivate(endpoint_id, **kwargs)
if res["code"] == "AutoActivationFailed":
message = ("The endpoint could not be auto-activated and must be "
"activated before it can be used.\n\n" +
activation_requirements_help_text(res, endpoint_id))
safeprint(message, write_to_stderr=True)
click.get_current_context().exit(1)
else:
return res
def style_tweet(tweet, porcelain=False):
conf = click.get_current_context().obj["conf"]
limit = conf.character_limit
if porcelain:
return "{nick}\t{url}\t{tweet}".format(
nick=tweet.source.nick,
url=tweet.source.url,
tweet=str(tweet))
else:
if sys.stdout.isatty() and not tweet.text.isprintable():
return None
styled_text = format_mentions(tweet.text)
len_styling = len(styled_text) - len(click.unstyle(styled_text))
final_text = textwrap.shorten(styled_text, limit + len_styling) if limit else styled_text
timestamp = tweet.absolute_datetime if conf.use_abs_time else tweet.relative_datetime
return "? {nick} ({time}):\n{tweet}".format(
nick=click.style(tweet.source.nick, bold=True),
tweet=final_text,
time=click.style(timestamp, dim=True))
def validate_text(ctx, param, value):
conf = click.get_current_context().obj["conf"]
if isinstance(value, tuple):
value = " ".join(value)
if not value and not sys.stdin.isatty():
value = click.get_text_stream("stdin").read()
if value:
value = value.strip()
if conf.character_warning and len(value) > conf.character_warning:
click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format(
conf.character_warning), abort=True)
return value
else:
raise click.BadArgumentUsage("Text can’t be empty.")
def get_code(shell=None, prog_name=None, env_name=None, extra_env=None):
"""Return the specified completion code"""
from jinja2 import Template
if shell in [None, 'auto']:
shell = get_auto_shell()
prog_name = prog_name or click.get_current_context().find_root().info_name
env_name = env_name or '_%s_COMPLETE' % prog_name.upper().replace('-', '_')
extra_env = extra_env if extra_env else {}
if shell == 'fish':
return Template(FISH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
elif shell == 'bash':
return Template(BASH_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
elif shell == 'zsh':
return Template(ZSH_TEMPLATE).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
elif shell == 'powershell':
return Template(POWERSHELL_COMPLETION_SCRIPT).render(prog_name=prog_name, complete_var=env_name, extra_env=extra_env)
else:
raise click.ClickException('%s is not supported.' % shell)
def configure():
"""
Kick things off and configure all the things.
A guess is made as to whether the entrypoint is coming from Click
or from another invocation of `configure()`. If Click, we're able
to pass along the Click context object.
"""
from .settings import discover_configs, configure
try:
ctx = click.get_current_context()
except RuntimeError:
ctx = None
_, py, yaml = discover_configs(ctx)
# TODO(mattrobenolt): Surface this also as a CLI option?
skip_backend_validation = 'SENTRY_SKIP_BACKEND_VALIDATION' in os.environ
configure(ctx, py, yaml, skip_backend_validation)
def _get_current_api_session():
"""
Get an API session, either from the Click context cache, or a new one from the config.
:return: API session
:rtype: APISession
"""
host, token = get_host_and_token()
ctx = click.get_current_context(silent=True) or None
cache_key = force_text('_api_session_%s_%s' % (host, token))
session = (getattr(ctx, cache_key, None) if ctx else None)
if not session:
session = APISession(host, token)
if ctx:
setattr(ctx, cache_key, session)
return session
def lint(filenames):
"""
Lint (syntax-check) a valohai.yaml file.
The return code of this command will be the total number of errors found in all the files.
"""
if not filenames:
project = get_project()
directory = (project.directory if project else get_project_directory())
config_file = os.path.join(directory, 'valohai.yaml')
if not os.path.exists(config_file):
raise CLIException('There is no %s file. Pass in the names of configuration files to lint?' % config_file)
filenames = [config_file]
total_errors = 0
for filename in filenames:
total_errors += validate_file(filename)
if total_errors:
warn('There were %d total errors.' % total_errors)
click.get_current_context().exit(total_errors)
def main(**parsed_settings):
click_context = click.get_current_context()
click_context.color = True # GitLab doesn't report terminal type correctly so we need to force it
settings.update(parsed_settings)
rancher.session.auth = settings['access_key'], settings['secret_key']
deployment.load_from_settings(settings)
hooks.dispatch('before_upgrade')
try:
upgrade()
except Exception as ex:
hooks.dispatch('after_upgrade_failure')
if isinstance(ex, rancher.UpgradeFailed):
return # we handled it gracefully already
raise
else:
hooks.dispatch('after_upgrade_success')
def tail(obj, follow, prefix, number):
'''Show recent logs from containers for each pod matched.
NUMBER is a count of recent lines or a relative duration (e.g. 5s, 2m, 3h)
'''
kubectl = obj.kubey.kubectl
if re.match(r'^\d+$', number):
log_args = ['--tail', str(number)]
else:
log_args = ['--since', number]
if follow:
log_args.append('-f')
for pod in obj.kubey.each_pod(obj.maximum):
for container in pod.containers:
args = ['-n', pod.namespace, '-c', container.name] + log_args + [pod.name]
if prefix:
prefix = '[%s:%s] ' % (pod.name, container.name)
kubectl.call_prefix(prefix, 'logs', *args)
else:
kubectl.call_async('logs', *args)
kubectl.wait()
if kubectl.final_rc != 0:
click.get_current_context().exit(kubectl.final_rc)
def debug_log(msg):
ctx = click.get_current_context(True)
if ctx and ctx.params.get('debug'):
click.echo(msg)
def from_context(cls):
"""Retrieve this class' instance from the current Click context.
:return: Instance of this class.
:rtype: Config
"""
try:
ctx = click.get_current_context()
except RuntimeError:
return cls()
return ctx.find_object(cls)
def click_fail(msg):
"""Convenience failing function."""
click.get_current_context().fail(msg)
def invoke(self, command_name, **kwargs):
"""
Runs a [sub]command by name, passing context automatically.
"""
context = click.get_current_context()
command = cli.get_command(context, command_name)
context.invoke(command, **kwargs)
def configure():
"""
Update configuration options. Configuration will be saved in ~/.dpm/conf or
the file provided with --config option.
"""
config.prompt_config(click.get_current_context().parent.params['config_path'])
def validate():
"""
Validate datapackage in the current dir. Print validation errors if found.
"""
client = click.get_current_context().meta['client']
try:
client.validate()
except (ValidationError, dprclient.ResourceDoesNotExist) as e:
echo('[ERROR] %s\n' % str(e))
sys.exit(1)
echo('datapackage.json is valid\n')
def purge():
"""
Purge datapackage from the registry server.
"""
client = click.get_current_context().meta['client']
client.purge()
echo('purge ok')
def delete():
"""
Delete datapackage from the registry server.
"""
client = click.get_current_context().meta['client']
client.delete()
echo('delete ok')
def undelete():
"""
Undelete datapackage from the registry server.
"""
client = click.get_current_context().meta['client']
client.undelete()
echo('undelete ok')
def tag(tag_string):
"""
Tag datapackage on the server. Create new copy of the latest version of the
datapackage on the server, and assign a tag.
"""
client = click.get_current_context().meta['client']
puburl = client.tag(tag_string)
echo('Datapackage successfully tagged.')
def _parse_boolean(self, string):
"""Parse string representing a boolean into Python bool type.
Supported values match `configparser.ConfigParser.getboolean`.
"""
trues = ['1', 'yes', 'true', 'on']
falses = ['0', 'no', 'false', 'off']
string_lower = string.lower()
if string_lower in trues:
return True
elif string_lower in falses:
return False
else:
import itertools
import click
msg = (
"Error: unrecognized value for --%s flag: %s\n"
"Supported values (case-insensitive): %s" %
(self.cli_name, string,
', '.join(itertools.chain(trues, falses)))
)
click.secho(msg, err=True, fg='red', bold=True)
ctx = click.get_current_context()
ctx.exit(1)
def get_value(self, arguments, fallback=None):
import os
import os.path
import click
try:
path = self._locate_value(arguments, fallback=fallback)
# TODO: do we want a --force like flag?
if os.path.exists(path):
click.secho("Error: --%s directory already exists, won't "
"overwrite." % self.cli_name, err=True, fg='red',
bold=True)
ctx = click.get_current_context()
ctx.exit(1)
os.makedirs(path)
def fallback_(name, cli_name):
return os.path.join(path, name)
return fallback_
except ValueNotFoundException:
# Always fail to find a value as this handler doesn't exist.
def fail(*_):
raise ValueNotFoundException()
return fail
def _error_with_duplicate_in_set(self, elements):
import click
import collections
counter = collections.Counter(elements)
dups = {name for name, count in counter.items() if count > 1}
ctx = click.get_current_context()
click.echo(ctx.get_usage() + '\n', err=True)
click.secho("Error: Option --%s was given these values: %r more than "
"one time, values passed should be unique."
% (self.cli_name, dups), err=True, fg='red', bold=True)
ctx.exit(1)
def handle_in_params(self, kwargs):
import q2cli.handlers
arguments = {}
missing = []
cmd_fallback = self.cmd_config_handler.get_value(kwargs)
verbose = self.verbose_handler.get_value(kwargs, fallback=cmd_fallback)
quiet = self.quiet_handler.get_value(kwargs, fallback=cmd_fallback)
if verbose and quiet:
click.secho('Unsure of how to be quiet and verbose at the '
'same time.', fg='red', bold=True, err=True)
click.get_current_context().exit(1)
for item in self.action['signature']:
if item['type'] == 'input' or item['type'] == 'parameter':
name = item['name']
handler = self.generated_handlers[name]
try:
arguments[name] = handler.get_value(
kwargs, fallback=cmd_fallback
)
except q2cli.handlers.ValueNotFoundException:
missing += handler.missing
return arguments, missing, verbose, quiet
def outformat_is_json():
"""
Only safe to call within a click context.
"""
ctx = click.get_current_context()
state = ctx.ensure_object(CommandState)
return state.outformat_is_json()
def outformat_is_text():
"""
Only safe to call within a click context.
"""
ctx = click.get_current_context()
state = ctx.ensure_object(CommandState)
return state.outformat_is_text()
def get_jmespath_expression():
"""
Only safe to call within a click context.
"""
ctx = click.get_current_context()
state = ctx.ensure_object(CommandState)
return state.jmespath_expr
def verbosity():
"""
Only safe to call within a click context.
"""
ctx = click.get_current_context()
state = ctx.ensure_object(CommandState)
return state.verbosity