def validate_parent_id(ctx, param, value):
if (ctx.params['resource_id'].startswith('sg-') and not value):
raise click.UsageError(
"Security Group lock status requires --parent-id flag")
return value
python类UsageError()的实例源码
def check_resp(r):
if r.json()['success']:
return
if r.status_code == 401:
raise click.UsageError('API call error: wrong API key')
raise click.UsageError('API call error:\n%s' % pformat(r.json()['details']))
def req_api_key(self):
if self.api_key is None:
raise click.UsageError('"moniqueio --api-key" argument is not specified')
return self.api_key
def _check_reserve_usage(empty, memory, cpu, disk):
"""Checks params constraints for reserve verb."""
if empty:
if memory:
raise click.UsageError('Cannot combine --empty and --memory')
if cpu:
raise click.UsageError('Cannot combine --empty and --cpu')
if disk:
raise click.UsageError('Cannot combine --empty and --disk')
def _check_tenant_exists(restapi, allocation):
"""Check if tenant exist."""
tenant_url = '/tenant/{}'.format(allocation)
# Check if tenant exists.
try:
restclient.get(restapi, tenant_url).json()
except restclient.NotFoundError:
raise click.UsageError(
'Allocation not found, '
'run allocation configure {} --systems ...'.format(allocation))
def init():
"""Return top level command handler."""
@click.command()
@click.option('--run/--no-run', is_flag=True, default=False)
@click.option('--treadmill-id', help='Treadmill admin user.')
@click.pass_context
def spawn(ctx, treadmill_id, run):
"""Installs Treadmill spawn."""
ctx.obj['PARAMS']['zookeeper'] = context.GLOBAL.zk.url
ctx.obj['PARAMS']['ldap'] = context.GLOBAL.ldap.url
dst_dir = ctx.obj['PARAMS']['dir']
profile = ctx.obj['PARAMS'].get('profile')
bootstrap.wipe(
os.path.join(dst_dir, 'wipe_me'),
os.path.join(dst_dir, 'bin', 'wipe_spawn.sh')
)
run_script = None
if run:
run_script = os.path.join(dst_dir, 'bin', 'run.sh')
if treadmill_id:
ctx.obj['PARAMS']['treadmillid'] = treadmill_id
if not ctx.obj['PARAMS'].get('treadmillid'):
raise click.UsageError(
'--treadmill-id is required, '
'unable to derive treadmill-id from context.')
bootstrap.install(
'spawn',
dst_dir,
ctx.obj['PARAMS'],
run=run_script,
profile=profile,
)
return spawn
def _blackout_server(zkclient, server, reason):
"""Blackout server."""
if not reason:
raise click.UsageError('--reason is required.')
path = z.path.blackedout_server(server)
zkutils.ensure_exists(
zkclient,
path,
acl=[zkutils.make_host_acl(server, 'rwcda')],
data=str(reason)
)
presence.kill_node(zkclient, server)
def handle_parse_result(self, ctx, opts, args):
if self.mutually_exclusive.intersection(opts) and \
self.name in opts:
raise click.UsageError(
"Illegal usage: `{}` is mutually exclusive with "
"arguments `{}`.".format(
self.name,
', '.join(self.mutually_exclusive)
)
)
if self.name == _OPTIONS_FILE and self.name in opts:
_file = opts.pop(_OPTIONS_FILE)
for _param in ctx.command.params:
opts[_param.name] = _param.default or \
_param.value_from_envvar(ctx) or ''
with open(_file, 'r') as stream:
data = yaml.load(stream)
_command_name = ctx.command.name
if data.get(_command_name, None):
opts.update(data[_command_name])
else:
raise click.BadParameter(
'Manifest file should have %s scope' % _command_name
)
opts['vpc_id'] = opts.pop('vpc_name')
ctx.params = opts
return super().handle_parse_result(ctx, opts, args)
def error(self, message):
raise click.UsageError(message)
def get_auto_shell():
"""Return the shell that is calling this process"""
try:
import psutil
parent = psutil.Process(os.getpid()).parent()
if platform.system() == 'Windows':
parent = parent.parent() or parent
return parent.name().replace('.exe', '')
except ImportError:
raise click.UsageError("Please explicitly give the shell type or install the psutil package to activate the"
" automatic shell detection.")
def ner(**kwargs):
kwargs["no_standardize"] = not kwargs["no_standardize"]
kwargs["no_convert_ions"] = not kwargs["no_convert_ions"]
kwargs["no_header"] = not kwargs["no_header"]
kwargs["no_normalize_text"] = not kwargs["no_normalize_text"]
kwargs["no_annotation"] = not kwargs["no_annotation"]
is_output_file = bool(kwargs["output"])
stdin = click.get_text_stream("stdin")
input_text = ""
if not stdin.isatty():
kwargs["input_file"] = ""
input_text = click.get_text_stream("stdin").read().strip()
if not input_text and not kwargs["input_file"]:
raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.")
kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"])
init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT)
process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS)
chemspot = ChemSpot(**init_kwargs)
result = chemspot.process(input_text=input_text, **process_kwargs)
if kwargs["dry_run"]:
print(result)
exit(0)
if kwargs["raw_output"]:
print(result["stdout"])
eprint(result["stderr"])
exit(0)
if not is_output_file:
print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def convert(**kwargs):
kwargs["no_header"] = not kwargs["no_header"]
kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"]
kwargs["no_standardize"] = not kwargs["no_standardize"]
kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"]
kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"]
kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"]
kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"]
is_output_file = bool(kwargs["output"])
stdin = click.get_text_stream("stdin")
input_text = ""
if not stdin.isatty():
kwargs["input_file"] = ""
input_text = click.get_text_stream("stdin").read().strip()
if not input_text and not kwargs["input_file"]:
raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.")
init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT)
process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS)
opsin = OPSIN(**init_kwargs)
result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs)
if kwargs["dry_run"]:
print(result)
exit(0)
if kwargs["raw_output"]:
print(result["stdout"])
eprint(result["stderr"])
exit(0)
if not is_output_file:
print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def main(word, word_dist, word_dist_rate, keyboard_name):
"""
keedi, keyboard usage stats for words
Invoke keedi on a word with the --word <word> option or pass words in on
stdin.
"""
if word:
words = [word]
elif not sys.stdin.isatty():
words = filter(None, (transform_word(w) for w in sys.stdin))
else:
click.ClickException("no --word specified or standard input given.")
for w in words:
word_dist_computation = None
keyboard = KEYBOARDS[keyboard_name]
wd = None
if word_dist:
wd = word_distance(w, keyboard)
wdr = None
if word_dist_rate:
try:
wdr = word_distance_rate(w, keyboard,
precomputation=word_dist_computation)
except IncomputableRateException as e:
click.UsageError(e).show()
if sys.stdin.isatty():
sys.exit(1)
else:
continue
click.echo('\t'.join(item for item in (str(wd), str(wdr), w) if item))
sys.exit(0)
def main():
try:
cli.add_command(sync_command)
cli.add_command(update_password)
cli.add_command(daemon)
cli.add_command(edit)
cli.add_command(browserhelp)
cli(standalone_mode=False)
except click.UsageError as e:
e.show()
exit(1)
except (exceptions.Error, click.ClickException) as e:
logger.error(e)
logger.debug(e, exc_info=True)
exit(1)
def init(ctx):
"""Initializes the database."""
db = ctx.obj.db
db_url = db.url
# Check if the database already exists
if is_sqlitedb_url(db_url) and sqlitedb_present(db_url):
raise click.UsageError("Refusing to overwrite database "
"at {}".format(db_url))
db.reset()
# -------------------------------------------------------------------------
# User commands
def user(ctx):
"""Subcommand to manage users."""
db = ctx.obj.db
db_url = db.url
# sqlite driver for sqlalchemy creates an empty file on commit as a side
# effect. We don't want this creation to happen, so before attempting
# the creation we stop short if we already find out that the file is
# missing and cannot possibly be initialized.
if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
raise click.UsageError("Could not find database at {}".format(db_url))
def app(ctx):
"""Subcommand to manage applications."""
db = ctx.obj.db
db_url = db.url
if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
raise click.UsageError("Could not find database at {}".format(db_url))
def cuv(ctx, coverage_fname, exclude, branch):
"""
Cuv'ner provides ways to visualize your project's coverage data.
Everything works on the console and assumes a unicode and
256-color capable terminal. There must be a .coverage file which
is loaded for coverage data; it is assumed to be in the top level
of your source code checkout.
"""
if coverage_fname is None:
coverage_fname = find_coverage_data('.')
# coverage_fname still could be None
cfg = Config()
ctx.obj = cfg
cfg.nice_width = min(80, click.get_terminal_size()[0])
cfg.exclude = exclude
cfg.branch = branch
if coverage_fname is not None:
cfg.data = coverage.Coverage(data_file=coverage_fname)
cfg.data.load()
else:
raise click.UsageError(
"No coverage data. Do you have a .coverage file?"
)
def url_validation(cls, url):
r = re.match("^https?:\/\/[\w\-\.]+\.[a-z]{2,6}\.?(\/[\w\.]*)*\/?$", url)
if r is None:
raise click.UsageError('Please, type valid URL')
return url
def test_persistfile_exists(self, filename, add_postfix=True):
filename = self._get_persist_filename(filename, add_postfix=add_postfix)
if os.path.isfile(filename):
raise click.UsageError("File '%s' already exists" % filename)