def list(package_name):
# lists all of the packages for a user, or all of the implementations for a package
# <username> / <package> , <implementation>
# detemine if there's a user and package, or just a user
p = split_package_name(package_name)
if p['username'] != None:
# get all of the packages and print their names in a pretty print
if p['package'] != None:
# get all implementations and print their names in a pretty print
if p['implementation'] != None:
print('Cannot list one specific implementation. Use "print".')
return
return
return
print('Error parsing arguments. Got {}. Specify in format such that: <username>/<package> with <package> being optional.'.format(p))
# @cli.command()
# @click.argument('payload')
# def print(payload):
# pass
python类argument()的实例源码
def handle_options_and_args(_command, arguments, options):
for argument in arguments:
if isinstance(argument, dict):
_command = click.argument(
list(argument.keys())[0],
**handle_option_and_arg_data(list(argument.values())[0])
)(_command)
else:
_command = click.argument(argument)(_command)
if isinstance(options, dict):
for name, data in options.items():
data = handle_option_and_arg_data(data)
_command = click.option(name, **data)(_command)
else:
for name in options:
_command = click.option(name)(_command)
return _command
def list_modules():
module_list = []
get_all_modules('ansible.modules', module_list)
return module_list
# @cli.command('get-module')
# @click.argument('module_name', nargs=1)
# @click.pass_context
# def get_module(ctx, module_name):
# matches = [x for x in list_modules() if x.endswith(module_name)]
# for m in matches:
# path, name = m.rsplit('.',1)
# module_info = read_module(m)
# pprint.pprint(module_info["doc"]["options"])
# # info_all = read_package(path)
# # module_info = info_all[0][name]
# # yaml_text = yaml.dump(module_info["doc"]["options"], default_flow_style=False)
# # print yaml_text
def cli(rc, jails):
"""
Looks for the jail supplied and passes the uuid, path and configuration
location to stop_jail.
"""
if not jails and not rc:
ioc_common.logit({
"level" : "EXCEPTION",
"message": 'Usage: iocage stop [OPTIONS] JAILS...\n'
'\nError: Missing argument "jails".'
}, exit_on_error=True)
if rc:
ioc.IOCage(exit_on_error=True, rc=rc, silent=True).stop()
else:
for jail in jails:
ioc.IOCage(exit_on_error=True, jail=jail, rc=rc).stop()
def cli(rc, jails):
"""
Looks for the jail supplied and passes the uuid, path and configuration
location to start_jail.
"""
if not jails and not rc:
ioc_common.logit({
"level" : "EXCEPTION",
"message": 'Usage: iocage start [OPTIONS] JAILS...\n'
'\nError: Missing argument "jails".'
}, exit_on_error=True)
if rc:
ioc.IOCage(exit_on_error=True, rc=rc, silent=True).start()
else:
for jail in jails:
ioc.IOCage(exit_on_error=True, jail=jail, rc=rc).start()
def cli(*args, **kwargs):
"""
CSVtoTable commandline utility.
"""
# Convert CSV file
content = convert.convert(kwargs["input_file"], **kwargs)
# Serve the temporary file in browser.
if kwargs["serve"]:
convert.serve(content)
# Write to output file
elif kwargs["output_file"]:
# Check if file can be overwrite
if (not kwargs["overwrite"] and
not prompt_overwrite(kwargs["output_file"])):
raise click.Abort()
convert.save(kwargs["output_file"], content)
click.secho("File converted successfully: {}".format(
kwargs["output_file"]), fg="green")
else:
# If its not server and output file is missing then raise error
raise click.BadOptionUsage("Missing argument \"output_file\".")
def logs(config):
pod = subprocess.check_output('kubectl get pods -l "app=api" -o name', shell=True)
if pod == "":
print "There is no pod running in this environment. You may need to deploy first."
exit(1)
pod_name = pod.split("/")[1].strip()
subprocess.call("kubectl logs {pod} --container flask --follow".format(pod=pod_name), shell=True)
# @click.command()
# @click.argument("config", type=StarterkitConfig(r'kube/deployments/'))
# def request_certs():
# # Request certs
# subprocess.call("kubectl exec ")
# # Get status of Certs
# subprocess.call("kubectl")
# # If
# pass
def task_id_arg(*args, **kwargs):
"""
This is the `TASK_ID` argument consumed by many Transfer Task operations.
It accept a toggle on whether or not it is required
Usage:
>>> @task_id_option
>>> def command_func(task_id):
>>> ...
or
>>> @task_id_option(required=False)
>>> def command_func(task_id):
>>> ...
By default, the task ID is made required; pass `required=False` to the
decorator arguments to make it optional.
"""
def inner_decorator(f, required=True):
f = click.argument('TASK_ID', required=required)(f)
return f
return detect_and_decorate(inner_decorator, args, kwargs)
def launch(version, raw):
"""
Launch Minecraft of the given version
"""
launcher = LauncherCore(config.mc_dir, config.java_dir)
if raw:
click.echo(launcher.launch_raw(version, config.username, config.max_mem))
else:
launcher.launch(version, config.username, config.max_mem)
# @main.command('download')
# @click.argument('version')
# @click.option('-c', '--client', is_flag=True)
# @click.option('-a', '--assets', is_flag=True)
# @click.option('-l', '--libraries', is_flag=True)
# @click.option('-F', '--forge', is_flag=True)
# @click.option('-L', '--liteloader', is_flag=True)
# @click.option('-E', '--external', is_flag=True, default=False, help='Open the download link externally.\
# Useful when you want to download files in another manner. Default is False')
# def _download(version, **components):
# """
# Download Minecraft, components or mods
# """
# pass
def config_(action, **kw):
"""
Configure your launcher and game preferences.
The 'action' argument is optional, and can be 'reset' or 'wizard'.
If it's left blank, only given options will be set.
Else, given options will be set AFTER the corresponding action is executed.
"""
if action == 'reset':
ok = click.prompt('Are you sure you want to reset you settings?', confirmation_prompt=True)
if ok:
config.reset()
elif action == 'wizard':
_wizard()
for k, v in kw.items():
if v is None:
break
if isinstance(v, str) and v[0] == '=':
v = v[1:]
config[k.replace('-', '_')] = v
config.save()
def list_(src, min, max):
"""
List Minecraft versions in a range(if given).
The 'src' argument can be 'local'(default) or 'remote'.
The former will check valid Minecraft versions on your computer.
The latter will get a list of valid versions released by Mojang.
"""
if src == 'local':
launcher = LauncherCore(config.mc_dir, config.java_dir)
vlist = launcher.versions.list
try:
begin = vlist.index(min)
except ValueError:
begin = 0
try:
end = vlist.index(max) + 1
except ValueError:
end = len(vlist)
click.echo('\n'.join(vlist[begin:end]))
else:
pass
def main(source, out, raw, format, view):
"""
Generate CRCDiagrams from SOURCE saving they as OUT.
\n
The default output format is png.
\n
Example:\n
crc-diagram source_file.py output.png
"""
if os.path.isdir(source):
crc_cards = crc_diagram.folder_to_crc(source)
else:
crc_cards = crc_diagram.to_crc(source)
if raw:
out = path_to_stream(out or sys.stdout, 'w')
json.dump([crc.to_dict() for crc in crc_cards], out, indent=4)
else:
if out is None:
raise click.UsageError('Missing argument "out".')
DotRender(crc_cards, format=format).render(out, view=view)
def tokens2topbigrams(sep, measure, freq, scores, tokens):
'''Find top most interesting bi-grams in a token document.
Uses the --measure argument to determine what measure to use to define
'interesting'.
'''
content = read_tokens(tokens)
bcf = nltk.collocations.BigramCollocationFinder.from_words(content)
bcf.apply_freq_filter(freq)
nltk_measure = MEASURES[measure]
bigrams = bcf.score_ngrams(nltk_measure)
out = [b[0] for b in bigrams]
if scores:
out = [b[0] + tuple([str(b[1])]) for b in bigrams]
write_csv(out, str(sep))
def cli(problem, path):
"""
Generate the resource files for problems.
These resources are either images - serving as helpful illustrations -
or text files containing specific data - referenced in the problem.
If the PROBLEM argument isn't specified, all resources will be
generated.
"""
if problem is None:
resources = os.listdir('%s/resources' % paths.DATA)
else:
if 'resources' not in problem:
sys.exit('Problem %s has no resource files' % problem['id'])
resources = problem['resources']
generate_resources(resources, path)
def argument(self, *args, **kwargs):
"""
Registers a click.argument which falls back to a configmanager Item
if user hasn't provided a value in the command line.
Item must be the last of ``args``.
"""
if kwargs.get('required', True):
raise TypeError(
'In click framework, arguments are mandatory, unless marked required=False. '
'Attempt to use configmanager as a fallback provider suggests that this is an optional option, '
'not a mandatory argument.'
)
args, kwargs = _config_parameter(args, kwargs)
return self._click.argument(*args, **kwargs)
def make_django_command(name, django_command=None, help=None):
"A wrapper to convert a Django subcommand a Click command"
if django_command is None:
django_command = name
@click.command(
name=name,
help=help,
add_help_option=False,
context_settings=dict(
ignore_unknown_options=True,
))
@click.argument('management_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def inner(ctx, management_args):
from munch.runner.commands.django import django
ctx.params['management_args'] = (django_command,) + management_args
ctx.forward(django)
return inner
def get_command(self, ctx, cmd_name):
"""Allow aliases for commands.
"""
if cmd_name == 'list':
cmd_name = 'ls'
elif cmd_name == 'search':
cmd_name = 'find'
elif cmd_name == 'gen':
cmd_name = 'generate'
elif cmd_name == 'add':
cmd_name = 'insert'
elif cmd_name in ['remove', 'delete']:
cmd_name = 'rm'
elif cmd_name == 'rename':
cmd_name = 'mv'
elif cmd_name == 'copy':
cmd_name = 'cp'
# TODO(benedikt) Figure out how to make 'show' the default
# command and pass cmd_name as the first argument.
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
def init(ctx, gpg_ids, path):
"""Initialize new password storage and use `gpg-id` for encryption.
Mutliple gpg-ids may be specified, in order to encrypt each
password with multiple ids. This command must be run first before
a password store can be used. If the specified `gpg-id` is
different from the ye used in any existing files, these files will
be reencrypted to use the new id. Note that use of an gpg agent
is recommended so that the batch decryption does not require as
much user intervention. If `--path` or `-p` is specified, along
with an argument, a specific gpg-id or a set of gpg-ids is
assigned for that specific sub folder of the password store. If
only the gpg-id is given, and it is an empty string then the
current `.gpg-id` file for the specfified `sub-folder` (or root if
unspecified) is removed.
"""
try:
ctx.obj.init_store(list(gpg_ids), path=path)
except PermissionError:
click.echo(MSG_PERMISSION_ERROR)
return 1
click.echo('Password store initialised for {0}.'
.format(','.join(gpg_ids)))
def crawl(ctx, spiders, stats):
"""
Crawl one or many or all pages.
What spider(s) to run is determined in the following order:
1. Spider(s) given as argument(s)
2. Spider(s) specified in the configuration file
Note that if a spider is given as an argument, the spiders in the
configuration file are ignored. All available spiders will be used to
crawl if no arguments are given and no spiders are configured.
"""
settings = ctx.obj['settings']
if stats:
settings.set('STATS_CLASS',
'scrapy.statscollectors.MemoryStatsCollector')
# Start a new crawler process.
process = CrawlerProcess(settings)
spiders = spiders_to_crawl(process, spiders)
if not spiders:
logger.error('Please specify what spiders you want to run!')
else:
for spider in spiders:
logger.info('Starting crawl of {} ...'.format(spider))
process.crawl(spider)
process.start()
if settings.getbool('HTTPCACHE_ENABLED'):
run_cleanup_cache(settings)
def test_pair_options_to_argument_args(self):
args = ['im', 't1', '-o', '1', 't2', 't3', '-o', '3']
@click.command()
@click.argument('img')
@click.argument('arg', nargs=-1, required=True)
@click.option('-o', '--option', multiple=True)
@utils.pair_options_to_argument(
'arg', {'option': 0}, args=args, args_slice=(1, None)
)
def command(img, arg, option):
click.echo(json.dumps((arg, option)))
runner = CliRunner()
output = runner.invoke(command, args).output
assert 'Error' not in output
assert [["t1", "t2", "t3"], ["1", 0, "3"]] == json.loads(output)
def test_two_paired_options_to_argument_args(self):
args = ['im', 't1', '-o', '1', 't2', '-a', '3', 't3']
@click.command()
@click.argument('img')
@click.argument('arg', nargs=-1, required=True)
@click.option('-o', '--option', multiple=True)
@click.option('-a', '--another', multiple=True)
@utils.pair_options_to_argument(
'arg', {'option': 0, 'another': 1}, args=args, args_slice=(1, None)
)
def command(img, arg, option, another):
click.echo(json.dumps((arg, option, another)))
runner = CliRunner()
output = runner.invoke(command, args).output
assert 'Error' not in output
assert ([["t1", "t2", "t3"], ["1", 0, 0], [1, "3", 1]]
== json.loads(output))
def test_pair_options_to_argument_args_default(self):
args = ['im', 't1', 't2', 't3']
@click.command()
@click.argument('img')
@click.argument('arg', nargs=-1, required=True)
@click.option('-o', '--option', multiple=True)
@utils.pair_options_to_argument(
'arg', {'option': 0}, args=args, args_slice=(1, None)
)
def command(img, arg, option):
click.echo(json.dumps((arg, option)))
runner = CliRunner()
output = runner.invoke(command, args).output
assert 'Error' not in output
assert [["t1", "t2", "t3"], [0, 0, 0]] == json.loads(output)
def test_pair_options_to_argument(self):
code = """
import json
import click
from histonets import utils
@click.group()
def main():
pass
@main.command()
@click.argument('img')
@click.argument('arg', nargs=-1, required=True)
@click.option('-o', '--option', multiple=True)
@utils.pair_options_to_argument('arg', {'option': 0}, args_slice=(2, None))
def command(img, arg, option):
click.echo(json.dumps((arg, option)))
main()
"""
cmd = ("echo \"{}\" "
"| python - command im t1 -o 1 t2 t3 -o 3".format(code))
ps = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = ps.communicate()[0].decode()
assert 'Error' not in output
assert [["t1", "t2", "t3"], ["1", 0, "3"]] == json.loads(output)
def add_to_scope(self, method_name, token, variable_type):
name = self.resolve_name(token, method_name)
if self.get_scope(method_name, name):
if self.get_scope(method_name, name)['type'] == 'argument':
self.log_message(
token.metadata.ln,
token.metadata.ch,
ASSIGNED_TO_ARGUMENT,
'Assigned a value to an argument "{}"'.format(name))
self.scope[method_name][name] = {
'type': variable_type,
'accessed': False,
'token': token,
}
def __init__(self, main, conf_dir=None, commands_dir=None, **kwargs):
self._commands_dir = commands_dir
if conf_dir:
configure.load_config_file(conf_dir)
# This is a bit of a hack, but need to register all parameters from
# the command line because want to allow tornado to handle them and
# click doesn't contain an equivalent of the `allow_extra_args`
# keyword argument that works for options.
# TODO: don't use tornado command line parsing
params = _options()
super(Command, self).__init__(self,
params=params,
callback=run(main),
invoke_without_command=True,
**kwargs)
def init():
"""Return top level command handler."""
@click.command()
@click.option('--api', required=False, help='API url to use.',
envvar='TREADMILL_RESTAPI')
@click.option('-m', '--manifest', help='App manifest file (stream)',
type=click.Path(exists=True, readable=True))
@click.option('--delete', help='Delete the app.',
is_flag=True, default=False)
@click.argument('appname', required=False)
@cli.handle_exceptions(restclient.CLI_REST_EXCEPTIONS)
def configure(api, manifest, delete, appname):
"""Configure a Treadmill app"""
restapi = context.GLOBAL.admin_api(api)
if appname:
if delete:
return _delete(restapi, appname)
return _configure(restapi, manifest, appname)
else:
return _list(restapi)
return configure
def init():
"""Return top level command handler."""
@click.command()
@click.argument('inputfile', type=click.Path(exists=True))
@click.argument('params', nargs=-1,
type=click.Path(exists=True, readable=True))
def interpolate(inputfile, params):
"""Interpolate input file template."""
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(inputfile)),
keep_trailing_newline=True
)
data = {}
for param in params:
with io.open(param, 'rb') as fd:
data.update(yaml.load(stream=fd))
cli.out(env.get_template(os.path.basename(inputfile)).render(data))
return interpolate
def init():
"""Top level command handler."""
@click.command()
@click.option('--approot', type=click.Path(exists=True),
envvar='TREADMILL_APPROOT', required=True)
@click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True)
@click.argument('eventfile', type=click.Path(exists=True))
def configure(approot, runtime, eventfile):
"""Configure local manifest and schedule app to run."""
tm_env = appenv.AppEnvironment(root=approot)
container_dir = app_cfg.configure(tm_env, eventfile, runtime)
_LOGGER.info('Configured %r', container_dir)
return configure
def plugin_uninstall(self):
@click.command("uninstall")
@click.argument("name")
def command(name):
"""Uninstalls the plugin with the given name."""
import sys
lower_name = name.lower()
if not lower_name.startswith("octoprint_") and not lower_name.startswith("octoprint-"):
click.echo("This doesn't look like an OctoPrint plugin name")
sys.exit(1)
call = [sys.executable, "-m", "pip", "uninstall", "--yes", name]
self.command_caller.call(call)
return command
def make_django_command(name, django_command=None, help=None):
"A wrapper to convert a Django subcommand a Click command"
if django_command is None:
django_command = name
@click.command(
name=name,
help=help,
add_help_option=False,
context_settings=dict(
ignore_unknown_options=True,
))
@click.argument('management_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def inner(ctx, management_args):
from {{ cookiecutter.module_name }}.runner.commands.django import django
ctx.params['management_args'] = (django_command,) + management_args
ctx.forward(django)
return inner