def get_credentials(username=None, password=None, insecure=False,
apikey=None, target_apikey=None):
""" A helper function to get credentials based on cmdline options.
Returns a tuple with 2 strings: (username, password).
When working with registries where only username matters:
missing password leads to auth request to registry authentication service
without 'account' query parameter which breaks login.
"""
if insecure:
return None, None
elif apikey:
return apikey, ' '
elif username:
if password is None:
raise click.BadParameter(
'Password is required when passing username.')
return username, password
elif password:
raise click.BadParameter(
'Username is required when passing password.')
return target_apikey, ' '
python类BadParameter()的实例源码
def startproject(name):
# Must validate projects name before creating its folder
if not name.isidentifier():
message = ('"{name}" is not a valid project name. Please make sure '
'the name is a valid identifier.')
raise click.BadParameter(message.format(name=name))
project_dir = os.path.join(os.getcwd(), name)
os.mkdir(project_dir)
# There's probably a better way to do this :)
template_dir = os.path.join(bottery.__path__[0], 'conf/project_template')
for root, dirs, files in os.walk(template_dir):
for filename in files:
new_filename = filename[:-4] # Removes "-tpl"
src = os.path.join(template_dir, filename)
dst = os.path.join(project_dir, new_filename)
shutil.copy(src, dst)
def validate_ip(ctx, param, value):
try:
ipaddress.ip_address(value)
return value
except ValueError as ex:
raise click.BadParameter("Invalid IP: %s" % ex)
def validate_more_than_one(allow_none=False):
"""
"""
def validate(ctx, param, value):
is_valid = False
if value is None:
is_valid = allow_none
elif value > 1:
is_valid = True
if not is_valid:
raise click.BadParameter("invalid choice: {0}. (choose a value greater than 1)".format(value))
return value
return validate
def import_callable(ctx, param, value):
if value is None:
return None
delim = ':'
if delim not in value:
raise click.BadParameter(
'string to import should have the form '
'pkg.module:callable_attribute')
mod, identifier = value.rsplit(delim, 1)
try:
func_or_cls = getattr(importlib.import_module(mod), identifier)
except AttributeError:
raise click.BadParameter('{} does not exist in {}'
.format(identifier, mod))
if callable(func_or_cls):
return func_or_cls
raise RuntimeError('{} is not callable'.format(value))
def tick(game, template, is_json, no_color):
if not game:
raise click.BadParameter('Missing required parameter "game"')
matches = download_history(game)
if is_json:
click.echo(json.dumps(list(matches), indent=2, sort_keys=True))
return
template = template if template else DEFAULT_TEMPLATE_RECAP
template = Template(template)
for m in matches:
if no_color or not COLOR_ENABLED: # if color is disabled just stdout
print_match(m, template)
continue
if m['t1_score'] > m['t2_score']:
m['t1'] = Fore.GREEN + m['t1'] + Fore.RESET
m['t2'] = Fore.RED + m['t2'] + Fore.RESET
else:
m['t2'] = Fore.GREEN + m['t2'] + Fore.RESET
m['t1'] = Fore.RED + m['t1'] + Fore.RESET
print_match(m, template)
def key_value_strings_callback(ctx, param, values):
"""Option callback to validate a list of key/value arguments.
Converts 'NAME=VALUE' cli parameters to a dictionary.
:rtype: dict
"""
params = {}
if not values:
return params
for value in values:
parts = value.split('=', 1)
if len(parts) != 2:
raise click.BadParameter('Invalid parameter format')
param_name, param_value = parts
params[param_name] = param_value
return params
def test_key_value_strings_callback():
ctx = None
param = None
# Check empty result
assert key_value_strings_callback(ctx, param, None) == {}
# Check result for a list of values
values = ['foo=bar', 'hello=world', 'wacky=value=go']
assert key_value_strings_callback(ctx, param, values) == {
'foo': 'bar',
'hello': 'world',
'wacky': 'value=go',
}
# Check invalid parameter format error
with pytest.raises(click.BadParameter):
key_value_strings_callback(ctx, param, ['boom'])
def validate(type_scheme: Type) -> t.Callable[[click.Context, str, t.Any], t.Any]:
"""
Creates a valid click option validator function that can be passed to click via the callback
parameter.
The validator function expects the type of the value to be the raw type of the type scheme.
:param type_scheme: type scheme the validator validates against
:return: the validator function
"""
def func(ctx, param, value):
param = param.human_readable_name
param = param.replace("-", "")
res = verbose_isinstance(value, type_scheme, value_name=param)
if not res:
raise click.BadParameter(str(res))
return value
return func
def parse_params(ctx, param, tokens):
ret = []
for token in tokens:
if '=' not in token:
raise click.BadParameter('Parameter "%s" should be in form of FIELD=VALUE')
field, value = token.split('=', 1)
try:
pair = (field, json.loads(value))
except:
if value.startswith('{') or value.startswith('['):
# Guard against malformed composite objects being treated as strings.
raise click.BadParameter('Unclear if parameter "%s" should be interperted as a string or data. Use --data or --string instead.' % field)
pair = (field, value)
ret.append(pair)
return ret
def resolve_db_name_multiple(ctx, param, value):
from odoo.service.db import list_dbs
config = (
ctx.obj['config']
)
dbs = config['db_name'].split(',') if config['db_name'] else None
if dbs is None:
dbs = list_dbs(True)
if value:
invalid = [db for db in value if db not in dbs]
if invalid:
raise click.BadParameter(
"No such db '%s'." % invalid[0]
)
return value
return dbs
def __init__(self, content=None, image=None):
self.image = None
self.format = None
if isinstance(image, Image):
self.image = image.image
self.format = image.format
elif image is not None:
self.image = image
elif content:
image_format = imghdr.what(file='', h=content)
if image_format is not None:
image_array = np.fromstring(content, np.uint8)
self.image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
self.format = image_format
if self.image is None:
raise click.BadParameter('Image format not supported')
def validate_single_media(context, param, values):
'''Parse the passed medias when they are "single", such as movies.'''
medias = []
for value in values:
query = value.strip()
try:
media = context.obj['MEDIA_CLASS'](query)
except Media.MediaNotFoundException:
raise click.BadParameter('the media {} was not found'.format(
query
))
except pytvdbapi.error.ConnectionError:
logger.error('You\'re not connected to any network.')
os._exit(1)
medias.append(('single', media))
return medias
def type_cast_value(self, ctx, value):
"""Given a value this runs it properly through the type system.
This automatically handles things like `nargs` and `multiple` as
well as composite types.
"""
if self.type.is_composite:
if self.nargs <= 1:
raise TypeError('Attempted to invoke composite type '
'but nargs has been set to %s. This is '
'not supported; nargs needs to be set to '
'a fixed value > 1.' % self.nargs)
if self.multiple:
return tuple(self.type(x or (), self, ctx) for x in value or ())
return self.type(value or (), self, ctx)
def _convert(value, level):
if level == 0:
if value == "":
#if self.required and self.is_sub_parameter:
# raise click.BadParameter(self.name+" is a required member of its parameter group. Please provide it inline after the associated superparameter.")
return None
return self.type(value, self, ctx)
return tuple(_convert(x, level - 1) for x in value or ())
v = _convert(value, (self.nargs != 1) + bool(self.multiple))
return _convert(value, (self.nargs != 1) + bool(self.multiple))
def compute_validation(j, grades, stops, beta, gamma, logb, rbp,
xrelnum, jrelnum):
if len(stops) != len(grades):
raise click.BadParameter("the size of '-s' must be the same as '-g'")
if beta < 0:
raise click.BadParameter("the value of '--beta' must be positive")
if gamma < 0 or gamma > 1:
raise click.BadParameter("the value of '--gamma' must range from 0 to 1")
if logb < 0:
raise click.BadParameter(
"the value of '--logb' must be 0 (natural log) or more")
if rbp < 0 or rbp > 1:
raise click.BadParameter("the value of '--rbp' must range from 0 to 1")
# TODO: bug report
# THIS EXCEPTION SHOULD NOT BE RAISED
#if j and xrelnum[0] == 0:
# raise Exception("No judged nonrel: bpref etc. not computable")
if jrelnum == 0:
raise Exception(
"No relevance document found in the relevance assessment file")
def init(cloud_url):
"""
Choose a cloud server to use. Sets CLOUD_URL as the cloud server to use and
sets up replication of global databases from that cloud server if a local
database is already initialized (via `openag db init`).
"""
old_cloud_url = config["cloud_server"]["url"]
if old_cloud_url and old_cloud_url != cloud_url:
raise click.ClickException(
'Server "{}" already selected. Call `openag cloud deinit` to '
'detach from that server before selecting a new one'.format(
old_cloud_url
)
)
parsed_url = urlparse(cloud_url)
if not parsed_url.scheme or not parsed_url.netloc or not parsed_url.port:
raise click.BadParameter("Invalid url")
if config["local_server"]["url"]:
utils.replicate_global_dbs(cloud_url=cloud_url)
config["cloud_server"]["url"] = cloud_url
def key_value_strings_callback(ctx, param, values):
"""Option callback to validate a list of key/value arguments.
Converts 'NAME=VALUE' cli parameters to a dictionary.
:rtype: dict
"""
params = {}
if not values:
return params
for value in values:
parts = value.split('=', 1)
if len(parts) != 2:
raise click.BadParameter('Invalid parameter format')
param_name, param_value = parts
params[param_name] = param_value
return params
def test_key_value_strings_callback():
ctx = None
param = None
# Check empty result
assert key_value_strings_callback(ctx, param, None) == {}
# Check result for a list of values
values = ['foo=bar', 'hello=world', 'wacky=value=go']
assert key_value_strings_callback(ctx, param, values) == {
'foo': 'bar',
'hello': 'world',
'wacky': 'value=go',
}
# Check invalid parameter format error
with pytest.raises(click.BadParameter):
key_value_strings_callback(ctx, param, ['boom'])
def create(ctx, image, verify):
"""Creates a new application for image IMAGE."""
# Verify if `image` is an existing docker image
# in this machine
if verify:
msg = ('{error}. You may consider skipping verifying '
'image name against docker with --no-verify.')
try:
client = get_docker_client()
client.inspect_image(image)
except Exception as exception:
raise click.BadParameter(msg.format(error=str(exception)),
ctx=ctx)
session = ctx.obj.session
try:
with orm.transaction(session):
orm_app = orm.Application(image=image)
session.add(orm_app)
except sqlalchemy.exc.IntegrityError:
print_error("Application for image {} already exists".format(image))
else:
print(orm_app.id)
def _resolve_data_source(value, default, param, must_exist=False):
if must_exist:
val_exists = os.path.isdir(value) if value else False
default_exists = os.path.isdir(default)
if val_exists:
path_to_use = value
elif default_exists:
click.echo('==> No source provided')
click.echo('==> Folder {} exists. Assuming that is our {}'
' folder...'.format(default, param))
path_to_use = default
else:
raise click.BadParameter('Cannot find a suitable {} '
'directory.'.format(param))
else:
path_to_use = value if value else default
return os.path.realpath(path_to_use)
def run(s, p):
if s:
server = micro_server("s1", auri=AMQ_URI)
@server.service("foobar")
def h(a):
print a, os.getpid()
return {"b": a}
server.start_service(2, daemon=False)
elif p:
if not os.path.isfile(p) or os.path.splitext(p)[1] != '.yaml':
raise click.BadParameter(
'the param must be yaml config')
w = WORK_FRAME(auri=AMQ_URI, service_group_conf=p)
w.frame_start()
else:
raise click.UsageError(
'Could not find other command. You can run kael run --help to see more information')
def status(n):
if not n:
raise click.BadParameter('namespace is wrong.')
server = WORK_FRAME(n, auri=AMQ_URI)
r = server.command('_get_pkg_version')
res = server.get_response(r)
# print json.dumps(res, indent=2)
for s_name in res:
print s_name
apps = res[s_name]
table = BeautifulTable(max_width=100)
table.column_headers = ['App name', 'version', 'path']
for app in apps:
table.append_row([app, apps[app]['version'], apps[app]['path']])
print(table)
print ''
def dev(p, kael_amqp):
"""
Example usage:
\b
$ kael-web dev
$ kael-web dev -p 5000 --kael_amqp 'amqp://user:****@localhost:5672/api'
"""
app.config['AMQP_URI'] = kael_amqp
print '\n', 'AMQP_URI:', str(kael_amqp), '\n'
if not kael_amqp:
raise click.BadParameter('Use --kael_amqp to set AMQP_URI (AMQP_URI not set)')
app.debug = True
print app.url_map
print ' * Running on 0.0.0.0:{} (Press CTRL+C to quit)'.format(p)
server = WSGIServer(("0.0.0.0", p), app)
server.serve_forever()
def validate_user_name(ctx, param, value: str) -> str:
"""Validates if ``value`` is a correct user name.
Parameters
----------
value : str
Possible user name.
Returns
-------
value : str
Valid user name.
Raises
------
click.BadParameter
If user name is not valid for unix systems.
"""
if re.fullmatch(r'^[a-z][a-z0-9_]{5,}', value):
return value
else:
raise click.BadParameter(
'User names can only contain 0-9, a-z and _.')
def parse_password(password: str) -> str:
"""Checks if ``password`` is suffiently strong.
Parameters
----------
password : str
Possible password.
Returns
-------
password : str
Valid password.
Raises:
click.BadParameter
If password is not sufficiently strong.
"""
if password in ['password', 'hallo123', 'admin']:
raise click.BadParameter('ERROR: You are kidding, right?')
elif re.fullmatch(r'[A-Za-z0-9]{{{},}}'.format(PASSWORD_LENGTH), password):
return password
else:
raise click.BadParameter('ERROR: Use only alphanumeric characters.')
def parse_lower_alpha(string: str) -> str:
"""Checks if ``string`` consists out of at least of 6 characters.
Parameters
----------
string : str
Possible string.
Returns
-------
string : str
Valid string.
Raises
------
click.BadParameter
If string is not sufficiently strong.
"""
if re.fullmatch(r'[a-z]{6,}', string):
return string
else:
raise click.BadParameter(
'ERROR: Use at least 6 lower-case characters.')
def parse_port(port: str) -> str:
"""Checks if port is valid.
Parameters
----------
port : str
Possible port name.
Returns
-------
port : str
Valid port name
Raises
------
click.BadParameter
If ``port`` is not a four-digit number not starting with zero.
"""
if re.fullmatch(r'^[1-9][0-9]{,3}', port):
return port
else:
raise click.BadParameter(
'ERROR: Use up to four digits not starting with zero.')
def parse_table_name(table_name: str) -> str:
"""Checks if ``table_name`` is a valid name for a PostgreSQL table.
Parameters
----------
table_name : str
Possible table name.
Returns
-------
table_name : str
Valid table name
Raises
------
click.BadParameter
If ``table_name`` is not valid for PSQL.
"""
if re.fullmatch(r'^[a-z][a-z_]{5,}', table_name):
return table_name
else:
raise click.BadParameter(
'ERROR: Use only lower-case characters and underscores.')
def validate_wave(ctx, param, value):
"""
Validate the wave file by trying to open it
and checking that its got a single channel.
:param ctx:<class 'click.core.Context'>
:param param:<class 'click.core.Option'>
:param value:str
:return:<class 'wave.Wave_read'>
"""
try:
wave_read = wave.open(value)
if wave_read.getnchannels() != 1:
raise click.BadParameter('Only mono wave files are supported')
return wave_read
except wave.Error as e:
raise click.BadParameter('Not a valid wave file. {}'.format(e.__str__()))
def validate_binary_string(ctx, param, value):
"""
Ensure that a binary string only has 1's and 0's
:param ctx:<class 'click.core.Context'>
:param param:<class 'click.core.Option'>
:param value:str
:return:str
"""
valid_characters = '10'
# If we string the value of valid characters, are there
# any other characters left over?
left_overs = value.strip(valid_characters)
# Except if there are characters left
if left_overs:
raise click.BadParameter('Only the characters "{}" is considered valid bitsring input.'
' The following were invalid: {}'.format(valid_characters, left_overs))
return value