def convert(self, value, param, ctx: click.Context) -> int:
"""
Convert method that makes this class usable as a click type.
"""
if isinstance(value, self):
return value
self.fail("{} is no valid time span".format(value), param, ctx)
python类Context()的实例源码
def disable_click_colors():
"""
Set a Click context where colors are disabled. Creates a throwaway BaseCommand
to play nicely with the Context constructor.
The intended side-effect here is that click.echo() checks this context and will
suppress colors.
https://github.com/pallets/click/blob/e1aa43a3/click/globals.py#L39
"""
ctx = Context(BaseCommand('AllYourBaseAreBelongToUs'))
ctx.color = False
push_context(ctx)
def cli(ctx, config, access_token):
# type: (click.Context, str, str) -> None
"""BOCCO API http://api-docs.bocco.me/ ? CLI ????????"""
debug = False
downloads = None
if config:
with open(config, 'r') as f:
config_json = json.load(f)
debug = config_json['debug']
downloads = config_json['downloads']
access_token = config_json['access_token']
ctx.obj['api'] = Client(access_token)
ctx.obj['debug'] = debug
ctx.obj['downloads'] = downloads
def rooms(ctx, verbose):
# type: (click.Context, bool) -> None
"""???????"""
api = ctx.obj['api']
template = u'{index}. {r[name]}\n\t{r[uuid]}'
if verbose:
template = u'''
{index}. {r[name]}
\tUUID: {r[uuid]}
\tMembers({members_count}): {members}
\tSensors({sensors_count}): {sensors}
\tLast message: {last_message_id}
\tUpdated at: {r[updated_at]}
'''.strip()
for i, r in enumerate(api.get_rooms()):
member_names = [m['user']['nickname'] for m in r['members']]
sensor_names = [s['nickname'] for s in r['sensors']]
last_message_id = 0
if 0 < len(r['messages']):
last_message_id = r['messages'][0]['id']
click.echo(template.format( # type: ignore
index=i + 1,
r=r,
members_count=len(member_names),
members=u', '.join(member_names),
sensors_count=len(sensor_names),
last_message_id=last_message_id,
sensors=u', '.join(sensor_names)))
def send(ctx, room_uuid, text):
# type: (click.Context, str, str) -> None
"""????????????."""
api = ctx.obj['api']
click.echo(u'????????...') # type: ignore
api.post_text_message(uuid.UUID(room_uuid), text)
def web(ctx):
# type: (click.Context) -> None
"""Web ????? API ?????????"""
api = ctx.obj['api']
debug = ctx.obj['debug']
downloads = ctx.obj['downloads']
app.config.update(dict(DEBUG=debug, DOWNLOADS=downloads))
app.api = api
app.run()
def _get_ctx():
@click.group()
def cli():
"""A sample command group."""
pass
@cli.command()
def hello():
"""A sample command."""
pass
return click.Context(cli, info_name='cli')
def ensure_project_path(f):
"""Ensure that the project path exists. In the event that it
does not exist, prompt the user to specify the path
"""
@click.pass_context
def new_func(ctx, *args, **kwargs):
"""
:param click.Context ctx:
"""
if get_project_path() is None:
click.echo('Config project_path not set.')
click.echo('You can do one of the following:')
choice = click_utils.prompt_choices([
'Clone the repository to {}.'.format(default_project_path()),
'Specify the path to an existing repo.'
])
if choice == 0:
path = default_project_path()
if os.path.exists(path):
click.confirm('Path {} already exists. Continuing will remove this path.'.format(
path), default=True, abort=True)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
click.echo('Cloning to {}...'.format(path), nl=False)
clone_project()
click.secho('Done', fg='black', bg='green')
save_project_path(path)
else:
value = click.prompt(
'Please enter the path to your project', type=str)
value = path_utils.canonical_path(value)
if not os.path.exists(value) or not os.path.isdir(value):
click.echo('This directory does not exist.')
ctx.exit(code=exit_codes.OTHER_FAILURE)
click.confirm('Is your project at {}?'.format(
value), default=True, abort=True)
save_project_path(value)
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(new_func, f)
def option_hosted_zone_id(f):
"""Similar to click.option for hosted-zone-id, but in the event
that the user does not specify the option, try to retrieve
the ID from AWS and only error out if that fails or is ambiguous.
"""
@click.option('--hosted-zone-id', '-hz', default=None, help='Route 53 Hosted Zone ID for {}'.format(settings.DOMAIN_NAME))
@click.pass_context
def new_func(ctx, hosted_zone_id, *args, **kwargs):
"""
:param click.Context ctx:
"""
if hosted_zone_id is None:
echo_heading('Trying to find hosted zone for {}.'.format(settings.DOMAIN_NAME), marker='-', marker_color='magenta')
client = boto3.client('route53')
response = client.list_hosted_zones_by_name(
DNSName=settings.DOMAIN_NAME
)
if len(response['HostedZones']) == 0:
click.echo('Unable to find a Hosted Zone for {}. Please specify it explicitly by passing --hosted-zone-id/-hz.')
ctx.exit(code=exit_codes.OTHER_FAILURE)
elif len(response['HostedZones']) > 1:
click.echo('Found multiple Hosted Zones for {}. Please specify one explicitly by passing --hosted-zone-id/-hz.')
ctx.exit(code=exit_codes.OTHER_FAILURE)
hosted_zone_id = response['HostedZones'][0]['Id'].split('/')[-1]
click.echo('Done.')
return ctx.invoke(f, hosted_zone_id=hosted_zone_id, *args, **kwargs)
return update_wrapper(new_func, f)
def skip_if_debug(f):
@click.pass_context
def new_func(ctx, *args, **kwargs):
"""
:param click.Context ctx:
"""
if settings.DEBUG:
click.echo('Skipping because DEBUG is True.')
ctx.exit()
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(new_func, f)
def get_stack(ctx, path):
"""
Parses the path to generate relevant Envrionment and Stack object.
:param ctx: Cli context.
:type ctx: click.Context
:param path: Path to either stack config or environment folder.
:type path: str
"""
return ConfigReader(
ctx.obj["sceptre_dir"], ctx.obj["options"]
).construct_stack(path)
def test_current_config():
with click.Context(main).scope() as ctx:
ctx.params['config'] = {'foo': 'bar'}
assert config.current_config == {'foo': 'bar'}
def test_get_config_path(self):
ctx = click.Context(click.Command('cli'))
ctx.obj = {'project_dir': 'foo/bar/baz/'}
path = _get_config_path(ctx)
self.assertEqual(path, 'foo/bar/baz/.pipetree/config.json')
def cmd_select(ctx, # type: click.Context
backend, # type: str
host, # type: str
port, # type: int
filter_address, # type: str
filter_service_name, # type: str
filter_service_tags, # type: str
hash_method, # type: proxenos.rendezvous.HashMethod
key, # type: str
):
# type: (...) -> None
"""Selects a node from a cluster."""
if port is None:
port = DEFAULT_PORTS[backend]
driver_manager = stevedore.driver.DriverManager(
namespace=proxenos.mappers.NAMESPACE,
name=backend,
invoke_on_load=False)
try:
mapper = driver_manager.driver(host=host, port=port)
except proxenos.errors.ServiceDiscoveryConnectionError as err:
click.secho(str(err), fg='red')
ctx.exit(proxenos.const.ExitCode.CONNECTION_FAILURE)
mapper.update()
cluster = mapper.cluster.copy()
# TODO(darvid): Add filtering support in the mapper API itself
if filter_service_tags:
pattern = re.compile(filter_service_tags)
cluster = {service for service in cluster
if all(pattern.match(tag) for tag in service.tags)}
if filter_service_name:
pattern = re.compile(filter_service_name)
cluster = {service for service in cluster
if pattern.match(service.name)}
if filter_address:
pattern = re.compile(filter_address)
cluster = {service for service in cluster
if pattern.match(str(service.socket_address))}
service = proxenos.rendezvous.select_node(
cluster, key, hash_method=hash_method)
if service is None:
ctx.exit(proxenos.const.ExitCode.SERVICES_NOT_FOUND)
click.echo(str(service.socket_address))
def do_ex(ctx: click.Context, cmd: typing.List[str], cwd: str = '.') -> typing.Tuple[str, str, int]:
"""
Executes a given command
Args:
ctx: Click context
cmd: command to run
cwd: working directory (defaults to ".")
Returns: stdout, stderr, exit_code
"""
def _popen_pipes(cmd_, cwd_):
def _always_strings(env_dict):
"""
On Windows and Python 2, environment dictionaries must be strings
and not unicode.
"""
env_dict.update(
(key, str(value))
for (key, value) in env_dict.items()
)
return env_dict
return subprocess.Popen(
cmd_,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(cwd_),
env=_always_strings(
dict(
os.environ,
# try to disable i18n
LC_ALL='C',
LANGUAGE='',
HGPLAIN='1',
)
)
)
def _ensure_stripped_str(_, str_or_bytes):
if isinstance(str_or_bytes, str):
return '\n'.join(str_or_bytes.strip().splitlines())
else:
return '\n'.join(str_or_bytes.decode('utf-8', 'surogate_escape').strip().splitlines())
exe = find_executable(cmd.pop(0))
if not exe:
exit(-1)
cmd.insert(0, exe)
click.secho(f'{cmd}', nl=False, fg='magenta')
p = _popen_pipes(cmd, cwd)
out, err = p.communicate()
click.secho(f' -> {p.returncode}', fg='magenta')
return _ensure_stripped_str(ctx, out), _ensure_stripped_str(ctx, err), p.returncode
def do(
ctx: click.Context,
cmd: typing.List[str],
cwd: str = '.',
mute_stdout: bool = False,
mute_stderr: bool = False,
# @formatter:off
filter_output: typing.Union[None, typing.Iterable[str]]=None
# @formatter:on
) -> str:
"""
Executes a command and returns the result
Args:
ctx: click context
cmd: command to execute
cwd: working directory (defaults to ".")
mute_stdout: if true, stdout will not be printed
mute_stderr: if true, stderr will not be printed
filter_output: gives a list of partial strings to filter out from the output (stdout or stderr)
Returns: stdout
"""
def _filter_output(input_):
def _filter_line(line):
# noinspection PyTypeChecker
for filter_str in filter_output:
if filter_str in line:
return False
return True
if filter_output is None:
return input_
return '\n'.join(filter(_filter_line, input_.split('\n')))
if not isinstance(cmd, (list, tuple)):
cmd = shlex.split(cmd)
out, err, ret = do_ex(ctx, cmd, cwd)
if out and not mute_stdout:
click.secho(f'{_filter_output(out)}', fg='cyan')
if err and not mute_stderr:
click.secho(f'{_filter_output(err)}', fg='red')
if ret:
click.secho(f'command failed: {cmd}', err=True, fg='red')
exit(ret)
return out
def main_add(ctx, # type: click.Context
options, # type: Dict[str, Any]
build, # type: bool
editable, # type: Iterable[str]
tag, # type: Iterable[str]
install, # type: bool
pre, # type: bool
resolve_canonical_names, # type: bool
resolve_versions, # type: bool
specifiers, # type: Tuple[str, ...]
):
# type: (...) -> None
"""Add packages to requirement source files."""
if not options['directory'].exists():
console.error('run `{} init\' first', ctx.find_root().info_name)
ctx.abort()
specifiers = tuple('-e {}'.format(e) for e in editable) + specifiers
if install:
pip_args = tuple(shlex.split(' '.join(specifiers)))
if pre:
pip_args = ('--pre',) + pip_args
pip_install(ctx, *pip_args)
if not tag:
tag = ('main',)
pip_options, session = reqwire.helpers.requirements.build_pip_session()
src_dir = options['directory'] / options['source_dir']
lookup_index_urls = set() # type: Set[str]
for tag_name in tag:
filename = src_dir / ''.join((tag_name, options['extension']))
if not filename.exists():
continue
_, finder = reqwire.helpers.requirements.parse_requirements(
filename=str(filename))
lookup_index_urls |= set(finder.index_urls)
try:
for tag_name in tag:
console.info('saving requirement(s) to {}', tag_name)
reqwire.scaffold.extend_source_file(
working_directory=str(options['directory']),
tag_name=tag_name,
specifiers=specifiers,
extension=options['extension'],
lookup_index_urls=lookup_index_urls,
prereleases=pre,
resolve_canonical_names=resolve_canonical_names,
resolve_versions=resolve_versions)
except piptools.exceptions.NoCandidateFound as err:
console.error(str(err))
ctx.abort()
if build:
ctx.invoke(main_build, all=False, tag=tag)
def catch_all(func):
"""
Catch (nearly) all exceptions unless in debug mode.
Args:
func (function): function being decorated
"""
def _catch_all(ctx, *args, **kwargs):
"""
Args:
ctx (click.Context): cli context object
args (tuple): tuple of args of the fuction
kwargs (dict): keyword args of the function
"""
def stderr(msg):
click.echo(click.style(msg, fg='red'), file=sys.stderr)
try:
return func(ctx, *args, **kwargs)
except click.Abort:
# on SIGINT click.prompt raises click.Abort
logger.error('') # just to get a newline
except click.ClickException:
raise # Let click deal with it
except Exception:
# generic error string
stderr(
'You have experienced a client-side technical error.')
# only dump the stack traces if the debug flag is set
if kwargs.get('debug') or ctx.obj['debug']:
stderr("\nFunction: {}.{}".format(func.__module__, func.__name__))
stderr("Args: {}".format(args))
stderr("Kwargs: {}".format(kwargs))
stderr("{}".format(traceback.format_exc()))
else:
stderr(
'For more detail, run your command with the debug flag: '
'`21 --debug <command>.`')
sys.exit(1)
return functools.update_wrapper(_catch_all, func)
def test_basic_parameters(self):
"""Validate a combination of parameters.
This exercises the code paths for a group with arguments, options and
environment variables.
"""
@click.group()
@click.option('--param', envvar='PARAM', help='A sample option')
@click.argument('ARG', envvar='ARG')
def cli():
"""A sample command group."""
pass
ctx = click.Context(cli, info_name='cli')
output = list(ext._format_command(ctx, show_nested=False))
self.assertEqual(textwrap.dedent("""
.. program:: cli
.. code-block:: shell
cli [OPTIONS] ARG COMMAND [ARGS]...
.. rubric:: Options
.. option:: --param <param>
A sample option
.. rubric:: Arguments
.. option:: ARG
Required argument
.. rubric:: Environment variables
.. envvar:: PARAM
Provide a default for :option:`--param`
.. envvar:: ARG
Provide a default for :option:`ARG`
""").lstrip(), '\n'.join(output))