def _write_requirements(ctx: click.Context, packages_list, outfile, prefix_list=None):
with open('temp', 'w') as source_file:
source_file.write('\n'.join(packages_list))
packages, _, ret = do_ex(
ctx,
[
'pip-compile',
'--index',
'--upgrade',
'--annotate',
'--no-header',
'-n',
'temp'
]
)
os.remove('temp')
with open(outfile, 'w') as req_file:
if prefix_list:
for prefix in prefix_list:
req_file.write(f'{prefix}\n')
for package in packages.splitlines():
req_file.write(f'{package}\n')
python类Context()的实例源码
def generate_man_page(ctx, version=None):
"""
Generate documentation for the given command.
:param click.Context ctx: the click context for the
cli application.
:rtype: str
:returns: the generate man page from the given click Context.
"""
# Create man page with the details from the given context
man_page = ManPage(ctx.command_path)
man_page.version = version
man_page.short_help = ctx.command.short_help
man_page.description = ctx.command.help
man_page.synopsis = ' '.join(ctx.command.collect_usage_pieces(ctx))
man_page.options = [x.get_help_record(None) for x in ctx.command.params if isinstance(x, click.Option)]
commands = getattr(ctx.command, 'commands', None)
if commands:
man_page.commands = [(k, v.short_help) for k, v in commands.items()]
return str(man_page)
def write_man_pages(name, cli, parent_ctx=None, version=None, target_dir=None):
"""
Generate man page files recursively
for the given click cli function.
:param str name: the cli name
:param cli: the cli instance
:param click.Context parent_ctx: the parent click context
:param str target_dir: the directory where the generated
man pages are stored.
"""
ctx = click.Context(cli, info_name=name, parent=parent_ctx)
man_page = generate_man_page(ctx, version)
path = '{0}.1'.format(ctx.command_path.replace(' ', '-'))
if target_dir:
path = os.path.join(target_dir, path)
with open(path, 'w+') as f:
f.write(man_page)
commands = getattr(cli, 'commands', {})
for name, command in commands.items():
write_man_pages(name, command, parent_ctx=ctx, version=version, target_dir=target_dir)
def main(ctx, # type: click.Context
directory, # type: click.Path
quiet, # type: bool
extension, # type: str
source_directory, # type: str
build_directory, # type: str
):
# type: (...) -> None
"""reqwire: micromanages your requirements."""
requirements_dir = pathlib.Path(str(directory))
console.verbose = not quiet
ctx.obj = {
'build_dir': build_directory,
'directory': requirements_dir,
'extension': extension,
'source_dir': source_directory,
}
def deploy(ctx, autogen_policy, profile, api_gateway_stage, stage,
connection_timeout):
# type: (click.Context, Optional[bool], str, str, str, int) -> None
factory = ctx.obj['factory'] # type: CLIFactory
factory.profile = profile
config = factory.create_config_obj(
chalice_stage_name=stage, autogen_policy=autogen_policy,
api_gateway_stage=api_gateway_stage,
)
session = factory.create_botocore_session(
connection_timeout=connection_timeout)
d = factory.create_default_deployer(session=session,
ui=UI())
deployed_values = d.deploy(config, chalice_stage_name=stage)
record_deployed_values(deployed_values, os.path.join(
config.project_dir, '.chalice', 'deployed.json'))
def url(ctx, stage):
# type: (click.Context, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
config = factory.create_config_obj(stage)
deployed = config.deployed_resources(stage)
if deployed is not None:
click.echo(
"https://{api_id}.execute-api.{region}.amazonaws.com/{stage}/"
.format(api_id=deployed.rest_api_id,
region=deployed.region,
stage=deployed.api_gateway_stage)
)
else:
e = click.ClickException(
"Could not find a record of deployed values to chalice stage: '%s'"
% stage)
e.exit_code = 2
raise e
def generate_sdk(ctx, sdk_type, stage, outdir):
# type: (click.Context, str, str, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
config = factory.create_config_obj(stage)
session = factory.create_botocore_session()
client = TypedAWSClient(session)
deployed = config.deployed_resources(stage)
if deployed is None:
click.echo("Could not find API ID, has this application "
"been deployed?", err=True)
raise click.Abort()
else:
rest_api_id = deployed.rest_api_id
api_gateway_stage = deployed.api_gateway_stage
client.download_sdk(rest_api_id, outdir,
api_gateway_stage=api_gateway_stage,
sdk_type=sdk_type)
def validate(type_scheme: Type) -> t.Callable[[click.Context, str, t.Any], t.Any]:
"""
Creates a valid click option validator function that can be passed to click via the callback
parameter.
The validator function expects the type of the value to be the raw type of the type scheme.
:param type_scheme: type scheme the validator validates against
:return: the validator function
"""
def func(ctx, param, value):
param = param.human_readable_name
param = param.replace("-", "")
res = verbose_isinstance(value, type_scheme, value_name=param)
if not res:
raise click.BadParameter(str(res))
return value
return func
def messages(ctx,
room_uuid,
newer_than,
older_than,
limit,
verbose):
# type: (click.Context, str, int, int, int, bool) -> None
"""???????????????"""
api = ctx.obj['api']
messages = api.get_messages(uuid.UUID(room_uuid),
newer_than=newer_than,
older_than=older_than)
template = u'{m[date]} {m[user][nickname]} {m[text]}'
if verbose:
template = u'''
{m[date]} {m[user][nickname]} {m[text]}
\tid: {m[id]}
\tunique_id: {m[unique_id]}
\tmedia: {m[media]}
\tmessage_type: {m[message_type]}
\tdictated: {m[dictated]}
'''.strip()
for m in messages[-limit:]:
click.echo(template.format(m=m)) # type: ignore
def test_no_parameters(self):
"""Validate a `click.Group` with no parameters.
This exercises the code paths for a group with *no* arguments, *no*
options and *no* environment variables.
"""
@click.group()
def cli():
"""A sample command group."""
pass
ctx = click.Context(cli, info_name='cli')
output = list(ext._format_command(ctx, show_nested=False))
self.assertEqual(textwrap.dedent("""
.. program:: cli
.. code-block:: shell
cli [OPTIONS] COMMAND [ARGS]...
""").lstrip(), '\n'.join(output))
def require_executable(executable):
"""Check if an executable is installed. If not, exit with an
error.
"""
def decorator(f):
@click.pass_context
def new_func(ctx, *args, **kwargs):
"""
:param click.Context ctx:
"""
if utils.which(executable) is None:
click.echo('{} is not installed. Please install it.'.format(executable))
ctx.exit(code=exit_codes.OTHER_FAILURE)
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(new_func, f)
return decorator
def ensure_executable_exists(name, get_executable):
"""Ensures that the private executable exists. If it doesn't, then
call get_executable, which must be a callable that installs
the executable.
"""
def decorator(f):
@click.pass_context
def new_func(ctx, *args, **kwargs):
"""
@type ctx: click.Context
"""
path = path_utils.executable_path(name)
if not os.path.exists(path):
echo_heading('Installing {}.'.format(name), marker='-', marker_color='magenta')
get_executable()
assert os.path.exists(path)
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(new_func, f)
return decorator
def get_stack_or_env(ctx, path):
"""
Parses the path to generate relevant Envrionment and Stack object.
:param ctx: Cli context.
:type ctx: click.Context
:param path: Path to either stack config or environment folder.
:type path: str
"""
stack = None
env = None
config_reader = ConfigReader(ctx.obj["sceptre_dir"], ctx.obj["options"])
if os.path.splitext(path)[1]:
stack = config_reader.construct_stack(path)
else:
env = config_reader.construct_environment(path)
return (stack, env)
def main(ctx):
# type: (click.Context) -> None
"""proxenos-cli: Consistently select nodes from service discovery."""
def validate_taxdump(value, method):
if (os.path.isfile("%s/%s" % (value, "names.dmp")) and
os.path.isfile("%s/%s" % (value, "nodes.dmp")) and
os.path.isfile("%s/%s" % (value, "merged.dmp")) and
os.path.isfile("%s/%s" % (value, "delnodes.dmp"))):
return value
else:
with click.Context(method) as ctx:
click.echo(ctx.get_help())
raise click.BadParameter("Could not find names.dmp in taxdump, specify a value or make sure the files are present")
def cd(path):
"""
Context to temporarily change the working directory
Args:
path: working directory to cd into
"""
old_dir = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_dir)
def _print_version(ctx: click.Context, param, value):
if not value or ctx.resilient_parsing:
return
ensure_repo()
_get_version(ctx)
exit(0)
# @click.group(invoke_without_command=True)
def reqs(ctx: click.Context, prod, test, dev):
"""
Write requirements files
"""
if not find_executable('pip-compile'):
click.secho('Missing module "pip-tools".\n'
'Install it manually with: "pip install pip-tools"\n'
'Or install all dependencies with: "pip install -r requirements-dev.txt"',
err=True, fg='red')
exit(-1)
if prod:
sys.path.insert(0, os.path.abspath('.'))
from setup import install_requires
_write_requirements(
ctx,
packages_list=install_requires,
outfile='requirements.txt'
)
sys.path.pop(0)
if test:
"""Writes requirements-test.txt"""
from setup import test_requires
_write_requirements(
ctx,
packages_list=test_requires,
outfile='requirements-test.txt',
prefix_list=['-r requirements.txt']
)
if dev:
"""Writes requirements-dev.txt"""
from setup import dev_requires
_write_requirements(
ctx,
packages_list=dev_requires,
outfile='requirements-dev.txt',
prefix_list=['-r requirements.txt', '-r requirements-test.txt']
)
def pip_install(ctx, *specifiers):
# type: (click.Context, str) -> None
try:
result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True)
for line in result:
click.echo(line, nl=False)
except sh.ErrorReturnCode:
ctx.abort()
def main_build(ctx, # type: click.Context
options, # type: Dict[str, Any]
all, # type: bool
tag, # type: Iterable[str]
pip_compile_options, # type: Iterable[str]
):
# type: (...) -> None
"""Build requirements with pip-compile."""
if not options['directory'].exists():
console.error('run `{} init\' first', ctx.find_root().info_name)
ctx.abort()
if not all and not tag:
console.error('either --all or --tag must be provided.')
ctx.abort()
src_dir = options['directory'] / options['source_dir']
dest_dir = options['directory'] / options['build_dir']
if not dest_dir.exists():
dest_dir.mkdir()
default_args = ['-r']
if not tag:
pattern = '*{}'.format(options['extension'])
tag = (path.stem for path in src_dir.glob(pattern))
for tag_name in tag:
src = src_dir / ''.join((tag_name, options['extension']))
dest = dest_dir / '{}.txt'.format(tag_name)
console.info('building {}', click.format_filename(str(dest)))
args = default_args[:]
args += [str(src)]
args += list(pip_compile_options)
with atomicwrites.AtomicWriter(str(dest), 'w', True).open() as f:
f.write(reqwire.scaffold.MODELINES_HEADER)
with tempfile.NamedTemporaryFile() as temp_file:
args += ['-o', temp_file.name]
sh.pip_compile(*args, _out=f, _tty_out=False)
def main_init(ctx, # type: click.Context
options, # type: Dict[str, Any]
force, # type: bool
index_url, # type: str
tag, # type: Iterable[str]
extra_index_url, # type: Tuple[str]
):
# type: (...) -> None
"""Initialize reqwire in the current directory."""
if not force and options['directory'].exists():
console.error('requirements directory already exists')
ctx.abort()
src_dir = reqwire.scaffold.init_source_dir(
options['directory'], exist_ok=force, name=options['source_dir'])
console.info('created {}', click.format_filename(str(src_dir)))
build_dir = reqwire.scaffold.init_source_dir(
options['directory'], exist_ok=force, name=options['build_dir'])
console.info('created {}', click.format_filename(str(build_dir)))
if not tag:
tag = ('docs', 'main', 'qa', 'test')
for tag_name in tag:
filename = reqwire.scaffold.init_source_file(
working_directory=options['directory'],
tag_name=tag_name,
extension=options['extension'],
index_url=index_url,
extra_index_urls=extra_index_url)
console.info('created {}', click.format_filename(str(filename)))
def get_full_name(ctx):
"""
Return the fully qualified name of the command.
>>> main_ctx = click.Context(click.Command('main'))
>>> market_ctx = click.Context(click.Command('market'), parent=main_ctx)
>>> join_ctx = click.Context(click.Command('join'), parent=market_ctx)
>>> get_full_name(join_ctx)
'21_market_join'
"""
if ctx.parent is None: # This is `two1.cli.main`
return '21'
else:
return get_full_name(ctx.parent) + '_' + ctx.command.name
def cli(ctx, project_dir, debug=False):
# type: (click.Context, str, bool) -> None
if project_dir is None:
project_dir = os.getcwd()
ctx.obj['project_dir'] = project_dir
ctx.obj['debug'] = debug
ctx.obj['factory'] = CLIFactory(project_dir, debug)
os.chdir(project_dir)
def local(ctx, host='127.0.0.1', port=8000, stage=DEFAULT_STAGE_NAME):
# type: (click.Context, str, int, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
run_local_server(factory, host, port, stage, os.environ)
def delete(ctx, profile, stage):
# type: (click.Context, str, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
factory.profile = profile
config = factory.create_config_obj(chalice_stage_name=stage)
session = factory.create_botocore_session()
d = factory.create_default_deployer(session=session, ui=UI())
d.delete(config, chalice_stage_name=stage)
remove_stage_from_deployed_values(stage, os.path.join(
config.project_dir, '.chalice', 'deployed.json'))
def logs(ctx, num_entries, include_lambda_messages, stage, profile):
# type: (click.Context, int, bool, str, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
factory.profile = profile
config = factory.create_config_obj(stage, False)
deployed = config.deployed_resources(stage)
if deployed is not None:
session = factory.create_botocore_session()
retriever = factory.create_log_retriever(
session, deployed.api_handler_arn)
display_logs(retriever, num_entries, include_lambda_messages,
sys.stdout)
def package(ctx, single_file, stage, out):
# type: (click.Context, bool, str, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
config = factory.create_config_obj(stage)
packager = factory.create_app_packager(config)
if single_file:
dirname = tempfile.mkdtemp()
try:
packager.package_app(config, dirname)
create_zip_file(source_dir=dirname, outfile=out)
finally:
shutil.rmtree(dirname)
else:
packager.package_app(config, out)
def generate_pipeline(ctx, codebuild_image, source, buildspec_file, filename):
# type: (click.Context, str, str, str, str) -> None
"""Generate a cloudformation template for a starter CD pipeline.
This command will write a starter cloudformation template to
the filename you provide. It contains a CodeCommit repo,
a CodeBuild stage for packaging your chalice app, and a
CodePipeline stage to deploy your application using cloudformation.
You can use any AWS SDK or the AWS CLI to deploy this stack.
Here's an example using the AWS CLI:
\b
$ chalice generate-pipeline pipeline.json
$ aws cloudformation deploy --stack-name mystack \b
--template-file pipeline.json --capabilities CAPABILITY_IAM
"""
from chalice import pipeline
factory = ctx.obj['factory'] # type: CLIFactory
config = factory.create_config_obj()
p = pipeline.CreatePipelineTemplate()
params = pipeline.PipelineParameters(
app_name=config.app_name,
lambda_python_version=config.lambda_python_version,
codebuild_image=codebuild_image,
code_source=source,
)
output = p.create_template(params)
if buildspec_file:
extractor = pipeline.BuildSpecExtractor()
buildspec_contents = extractor.extract_buildspec(output)
with open(buildspec_file, 'w') as f:
f.write(buildspec_contents)
with open(filename, 'w') as f:
f.write(serialize_to_json(output))
def convert(self, value, param, ctx: click.Context) -> t.List[str]:
"""
Convert method that makes this class usable as a click type.
"""
if isinstance(value, self):
return value
elif isinstance(value, str):
value = str(value)
return value.split(",")
self.fail("{} is no valid comma separated string list".format(value), param, ctx)
def convert(self, value, param, ctx: click.Context) -> t.Optional[bool]:
"""
Convert method that makes this class usable as a click type.
"""
if isinstance(value, self):
return value
elif isinstance(value, str):
value = value.lower()
if value == "true" :
return True
elif value == "false":
return False
elif value == "none":
return None
self.fail("{} is no valid bool or 'none'".format(value), param, ctx)