python类UsageError()的实例源码

cli.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def validate_parent_id(ctx, param, value):
    if (ctx.params['resource_id'].startswith('sg-') and not value):
        raise click.UsageError(
            "Security Group lock status requires --parent-id flag")
    return value
cli.py 文件源码 项目:moniqueio-tools 作者: monique-io 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_resp(r):
    if r.json()['success']:
        return
    if r.status_code == 401:
        raise click.UsageError('API call error: wrong API key')
    raise click.UsageError('API call error:\n%s' % pformat(r.json()['details']))
cli.py 文件源码 项目:moniqueio-tools 作者: monique-io 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def req_api_key(self):
        if self.api_key is None:
            raise click.UsageError('"moniqueio --api-key" argument is not specified')
        return self.api_key
allocation.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _check_reserve_usage(empty, memory, cpu, disk):
    """Checks params constraints for reserve verb."""
    if empty:
        if memory:
            raise click.UsageError('Cannot combine --empty and --memory')
        if cpu:
            raise click.UsageError('Cannot combine --empty and --cpu')
        if disk:
            raise click.UsageError('Cannot combine --empty and --disk')
allocation.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _check_tenant_exists(restapi, allocation):
    """Check if tenant exist."""
    tenant_url = '/tenant/{}'.format(allocation)

    # Check if tenant exists.
    try:
        restclient.get(restapi, tenant_url).json()
    except restclient.NotFoundError:
        raise click.UsageError(
            'Allocation not found, '
            'run allocation configure {} --systems ...'.format(allocation))
spawn.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def init():
    """Return top level command handler."""

    @click.command()
    @click.option('--run/--no-run', is_flag=True, default=False)
    @click.option('--treadmill-id', help='Treadmill admin user.')
    @click.pass_context
    def spawn(ctx, treadmill_id, run):
        """Installs Treadmill spawn."""
        ctx.obj['PARAMS']['zookeeper'] = context.GLOBAL.zk.url
        ctx.obj['PARAMS']['ldap'] = context.GLOBAL.ldap.url

        dst_dir = ctx.obj['PARAMS']['dir']
        profile = ctx.obj['PARAMS'].get('profile')

        bootstrap.wipe(
            os.path.join(dst_dir, 'wipe_me'),
            os.path.join(dst_dir, 'bin', 'wipe_spawn.sh')
        )

        run_script = None
        if run:
            run_script = os.path.join(dst_dir, 'bin', 'run.sh')

        if treadmill_id:
            ctx.obj['PARAMS']['treadmillid'] = treadmill_id

        if not ctx.obj['PARAMS'].get('treadmillid'):
            raise click.UsageError(
                '--treadmill-id is required, '
                'unable to derive treadmill-id from context.')

        bootstrap.install(
            'spawn',
            dst_dir,
            ctx.obj['PARAMS'],
            run=run_script,
            profile=profile,
        )

    return spawn
blackout.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _blackout_server(zkclient, server, reason):
    """Blackout server."""
    if not reason:
        raise click.UsageError('--reason is required.')

    path = z.path.blackedout_server(server)
    zkutils.ensure_exists(
        zkclient,
        path,
        acl=[zkutils.make_host_acl(server, 'rwcda')],
        data=str(reason)
    )
    presence.kill_node(zkclient, server)
mutually_exclusive_option.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_parse_result(self, ctx, opts, args):
        if self.mutually_exclusive.intersection(opts) and \
           self.name in opts:
            raise click.UsageError(
                "Illegal usage: `{}` is mutually exclusive with "
                "arguments `{}`.".format(
                    self.name,
                    ', '.join(self.mutually_exclusive)
                )
            )
        if self.name == _OPTIONS_FILE and self.name in opts:
            _file = opts.pop(_OPTIONS_FILE)
            for _param in ctx.command.params:
                opts[_param.name] = _param.default or \
                    _param.value_from_envvar(ctx) or ''
            with open(_file, 'r') as stream:
                data = yaml.load(stream)

            _command_name = ctx.command.name
            if data.get(_command_name, None):
                opts.update(data[_command_name])
            else:
                raise click.BadParameter(
                    'Manifest file should have %s scope' % _command_name
                )
            opts['vpc_id'] = opts.pop('vpc_name')
            ctx.params = opts

        return super().handle_parse_result(ctx, opts, args)
lexer.py 文件源码 项目:restcli 作者: dustinrohde 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def error(self, message):
        raise click.UsageError(message)
click_completion.py 文件源码 项目:pipenv 作者: pypa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_auto_shell():
    """Return the shell that is calling this process"""
    try:
        import psutil
        parent = psutil.Process(os.getpid()).parent()
        if platform.system() == 'Windows':
            parent = parent.parent() or parent
        return parent.name().replace('.exe', '')
    except ImportError:
        raise click.UsageError("Please explicitly give the shell type or install the psutil package to activate the"
                               " automatic shell detection.")
cli.py 文件源码 项目:molminer 作者: gorgitko 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ner(**kwargs):
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["no_convert_ions"] = not kwargs["no_convert_ions"]
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_text"] = not kwargs["no_normalize_text"]
    kwargs["no_annotation"] = not kwargs["no_annotation"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
        if not input_text and not kwargs["input_file"]:
            raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.")

    kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"])

    init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS)

    chemspot = ChemSpot(**init_kwargs)
    result = chemspot.process(input_text=input_text, **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
cli.py 文件源码 项目:molminer 作者: gorgitko 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def convert(**kwargs):
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"]
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"]
    kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"]
    kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"]
    kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
    if not input_text and not kwargs["input_file"]:
        raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.")

    init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS)

    opsin = OPSIN(**init_kwargs)
    result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
__main__.py 文件源码 项目:keedi 作者: t-mart 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main(word, word_dist, word_dist_rate, keyboard_name):
    """
    keedi, keyboard usage stats for words

    Invoke keedi on a word with the --word <word> option or pass words in on
    stdin.
    """
    if word:
        words = [word]
    elif not sys.stdin.isatty():
        words = filter(None, (transform_word(w) for w in sys.stdin))
    else:
        click.ClickException("no --word specified or standard input given.")

    for w in words:
        word_dist_computation = None
        keyboard = KEYBOARDS[keyboard_name]

        wd = None
        if word_dist:
            wd = word_distance(w, keyboard)

        wdr = None
        if word_dist_rate:
            try:
                wdr = word_distance_rate(w, keyboard,
                                         precomputation=word_dist_computation)
            except IncomputableRateException as e:
                click.UsageError(e).show()
                if sys.stdin.isatty():
                    sys.exit(1)
                else:
                    continue
        click.echo('\t'.join(item for item in (str(wd), str(wdr), w) if item))
    sys.exit(0)
__main__.py 文件源码 项目:connect 作者: openhsr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    try:
        cli.add_command(sync_command)
        cli.add_command(update_password)
        cli.add_command(daemon)
        cli.add_command(edit)
        cli.add_command(browserhelp)
        cli(standalone_mode=False)
    except click.UsageError as e:
        e.show()
        exit(1)
    except (exceptions.Error, click.ClickException) as e:
        logger.error(e)
        logger.debug(e, exc_info=True)
        exit(1)
__main__.py 文件源码 项目:simphony-remote 作者: simphony 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init(ctx):
    """Initializes the database."""
    db = ctx.obj.db
    db_url = db.url

    # Check if the database already exists
    if is_sqlitedb_url(db_url) and sqlitedb_present(db_url):
        raise click.UsageError("Refusing to overwrite database "
                               "at {}".format(db_url))
    db.reset()


# -------------------------------------------------------------------------
# User commands
__main__.py 文件源码 项目:simphony-remote 作者: simphony 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def user(ctx):
    """Subcommand to manage users."""
    db = ctx.obj.db
    db_url = db.url

    # sqlite driver for sqlalchemy creates an empty file on commit as a side
    # effect. We don't want this creation to happen, so before attempting
    # the creation we stop short if we already find out that the file is
    # missing and cannot possibly be initialized.
    if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
        raise click.UsageError("Could not find database at {}".format(db_url))
__main__.py 文件源码 项目:simphony-remote 作者: simphony 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def app(ctx):
    """Subcommand to manage applications."""
    db = ctx.obj.db
    db_url = db.url

    if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
        raise click.UsageError("Could not find database at {}".format(db_url))
cli.py 文件源码 项目:cuvner 作者: meejah 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cuv(ctx, coverage_fname, exclude, branch):
    """
    Cuv'ner provides ways to visualize your project's coverage data.

    Everything works on the console and assumes a unicode and
    256-color capable terminal. There must be a .coverage file which
    is loaded for coverage data; it is assumed to be in the top level
    of your source code checkout.
    """
    if coverage_fname is None:
        coverage_fname = find_coverage_data('.')
        # coverage_fname still could be None

    cfg = Config()
    ctx.obj = cfg

    cfg.nice_width = min(80, click.get_terminal_size()[0])
    cfg.exclude = exclude

    cfg.branch = branch
    if coverage_fname is not None:
        cfg.data = coverage.Coverage(data_file=coverage_fname)
        cfg.data.load()
    else:
        raise click.UsageError(
            "No coverage data. Do you have a .coverage file?"
        )
io.py 文件源码 项目:jira_pub_sync 作者: d-lobanov 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def url_validation(cls, url):
        r = re.match("^https?:\/\/[\w\-\.]+\.[a-z]{2,6}\.?(\/[\w\.]*)*\/?$", url)
        if r is None:
            raise click.UsageError('Please, type valid URL')

        return url
sodium11.py 文件源码 项目:sodium11 作者: trbs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_persistfile_exists(self, filename, add_postfix=True):
        filename = self._get_persist_filename(filename, add_postfix=add_postfix)
        if os.path.isfile(filename):
            raise click.UsageError("File '%s' already exists" % filename)


问题


面经


文章

微信
公众号

扫码关注公众号