def main(qatif, stats, outdir, cloudmask):
if not stats and not outdir and not cloudmask:
raise click.UsageError(
"Specify --stats, --cloudmask MASK, or --outdir DIR")
if outdir:
stats = True
with rasterio.open(qatif) as src:
arr = src.read(1)
profile = src.profile
if cloudmask:
write_cloud_mask(arr, profile=profile, cloudmask=cloudmask)
if stats:
base = os.path.basename(qatif)
summary = summary_stats(arr, basename=base, outdir=outdir, profile=profile)
click.echo(json.dumps(summary, indent=2))
if outdir:
click.echo("QA variables written as uint8 tifs to {}".format(outdir),
err=True)
python类UsageError()的实例源码
def rename_command(source, destination):
"""
Executor for `globus rename`
"""
source_ep, source_path = source
dest_ep, dest_path = destination
if source_ep != dest_ep:
raise click.UsageError(('rename requires that the source and dest '
'endpoints are the same, {} != {}')
.format(source_ep, dest_ep))
endpoint_id = source_ep
client = get_client()
autoactivate(client, endpoint_id, if_expires_in=60)
res = client.operation_rename(endpoint_id, oldpath=source_path,
newpath=dest_path)
formatted_print(res, text_format=FORMAT_TEXT_RAW, response_key='message')
def endpoint_search(filter_fulltext, filter_owner_id, filter_scope):
"""
Executor for `globus endpoint search`
"""
if filter_scope == 'all' and not filter_fulltext:
raise click.UsageError(
'When searching all endpoints (--filter-scope=all, the default), '
'a full-text search filter is required. Other scopes (e.g. '
'--filter-scope=recently-used) may be used without specifying '
'an additional filter.')
client = get_client()
owner_id = filter_owner_id
if owner_id:
owner_id = maybe_lookup_identity_id(owner_id)
search_iterator = client.endpoint_search(
filter_fulltext=filter_fulltext, filter_scope=filter_scope,
filter_owner_id=owner_id)
formatted_print(search_iterator, fields=ENDPOINT_LIST_FIELDS,
json_converter=iterable_response_to_dict)
def role_create(role, principal, endpoint_id):
"""
Executor for `globus endpoint role show`
"""
principal_type, principal_val = principal
client = get_client()
if principal_type == 'identity':
principal_val = maybe_lookup_identity_id(principal_val)
if not principal_val:
raise click.UsageError(
'Identity does not exist. '
'Use --provision-identity to auto-provision an identity.')
elif principal_type == 'provision-identity':
principal_val = maybe_lookup_identity_id(principal_val, provision=True)
principal_type = 'identity'
role_doc = assemble_generic_doc(
'role', principal_type=principal_type, principal=principal_val,
role=role)
res = client.add_endpoint_role(endpoint_id, role_doc)
formatted_print(res, simple_text='ID: {}'.format(res['id']))
def copy_template(template_path: Path, path: Path, variables: dict):
for d in template_path.iterdir():
target_path = path / d.relative_to(template_path)
if d.is_dir():
copy_template(d, target_path, variables)
elif target_path.exists():
# better not overwrite any existing files!
raise click.UsageError('Target file "{}" already exists. Aborting!'.format(target_path))
else:
with Action('Writing {}..'.format(target_path)):
target_path.parent.mkdir(parents=True, exist_ok=True)
with d.open() as fd:
contents = fd.read()
template = string.Template(contents)
contents = template.safe_substitute(variables)
with target_path.open('w') as fd:
fd.write(contents)
def main(source, out, raw, format, view):
"""
Generate CRCDiagrams from SOURCE saving they as OUT.
\n
The default output format is png.
\n
Example:\n
crc-diagram source_file.py output.png
"""
if os.path.isdir(source):
crc_cards = crc_diagram.folder_to_crc(source)
else:
crc_cards = crc_diagram.to_crc(source)
if raw:
out = path_to_stream(out or sys.stdout, 'w')
json.dump([crc.to_dict() for crc in crc_cards], out, indent=4)
else:
if out is None:
raise click.UsageError('Missing argument "out".')
DotRender(crc_cards, format=format).render(out, view=view)
def generate(filename, data_file, output_dir, template_file, skip_first_row):
if not os.path.exists(filename):
raise click.UsageError("Layout not found: %s\n" % filename)
try:
module = _import_file(filename)
except (ImportError, ValueError) as e:
raise click.UsageError("Unable to load %r: %s\n" % (filename, e))
layout_classes = list(_iter_layout_classes(module))
if not layout_classes:
raise click.UsageError("No layout found in file: %s\n" % filename)
layout_cls = layout_classes.pop()
click.secho('Generating documents...', fg='white')
layout = layout_cls(data_file, output_dir, template_file, skip_first_row)
# Check files/paths
layout._check_paths()
data = csv.reader(open(layout.data_file))
if layout.skip_first_row:
next(data)
# TODO: Show info about rows count
for row in data:
layout.generate_document(row)
def resolve_command(self, ctx, args):
"""
Override clicks ``resolve_command`` method and appends *Did you mean ...* suggestions to the
raised exception message.
"""
try:
return super(AliasedGroup, self).resolve_command(ctx, args)
except click.exceptions.UsageError as error:
error_msg = str(error)
original_cmd_name = click.utils.make_str(args[0])
matches = difflib.get_close_matches(
original_cmd_name,
self.list_commands(ctx),
self.max_suggestions,
self.cutoff)
if matches:
error_msg += '{0}{0}Did you mean one of these?{0} {1}'.format(
os.linesep,
'{0} '.format(os.linesep).join(matches, ))
raise click.exceptions.UsageError(error_msg, error.ctx)
def freeze_app(app, freezer, path, base_url):
if not base_url:
raise click.UsageError('No base URL provided, use --base-url')
print('Generating HTML...')
app.config['FREEZER_DESTINATION'] = path
app.config['FREEZER_BASE_URL'] = base_url
app.config['SERVER_NAME'] = urllib.parse.urlparse(base_url).netloc
# make sure Frozen Flask warnings are treated as errors
warnings.filterwarnings('error', category=flask_frozen.FrozenFlaskWarning)
try:
freezer.freeze()
except flask_frozen.FrozenFlaskWarning as w:
print('Error:', w, file=sys.stderr)
sys.exit(1)
route53cache.py 文件源码
项目:docker-service-registrator-route53
作者: mvanholsteijn
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_dns_name_for_hosted_zone_id(self, hosted_zone_id):
"""
gets the dns name associated with the 'hosted_zone_id'.
ensure:
self.hosted_zone_id == hosted_zone_id
self.dns_name[-1] == '.'
"""
assert hosted_zone_id is not None
self.hosted_zone_id = hosted_zone_id
try:
response = self.route53.get_hosted_zone(Id=hosted_zone_id)
self.dns_name = response['HostedZone']['Name']
except ClientError as e:
raise click.UsageError('%s' % e)
log.info('found hosted zone %s for dns name %s' % (self.hosted_zone_id, self.dns_name))
assert self.dns_name[-1] == '.'
assert self.hosted_zone_id is not None
route53cache.py 文件源码
项目:docker-service-registrator-route53
作者: mvanholsteijn
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_hosted_zone_id_for_dns_name(self, dns_name):
"""
gets the hosted_zone_id associated with the 'dns_name'.
require:
there is exactly 1 hosted zone for 'dns_name' in Route53.
ensure:
self.hosted_zone_id == hosted_zone_id
self.dns_name[-1] == '.'
self.dns_name[-1] == dns_name
"""
self.dns_name = dns_name if dns_name[-1] == '.' else '%s.' % dns_name
response = self.route53.list_hosted_zones_by_name(DNSName=self.dns_name)
zones = filter(lambda r: r['Name'] == self.dns_name, response['HostedZones'])
if len(zones) == 1:
self.hosted_zone_id = zones[0]['Id'].split('/')[-1]
elif len(zones) > 1:
raise click.UsageError('There are %d hosted zones for the DNS name %s, please specify --hosted-zone-id' % (len(zones), self.dns_name))
else:
raise click.UsageError('There is no hosted zones for the DNS name %s' % self.dns_name)
assert self.dns_name[-1] == '.'
assert self.hosted_zone_id is not None
log.info('found dns name %s for hosted zone id %s' %
(self.dns_name, self.hosted_zone_id))
def import_(ctx, dirs):
for dir in dirs:
if not os.path.isdir(dir):
raise click.UsageError('<%s> must be a directory' % dir)
def do_import(report_name, instances):
count = 0
try:
for instance in instances:
import_report_instance(ctx, report_name, instance)
count += 1
finally:
echo_info('Imported %d %s instances' % (count, report_name))
def instances(dir):
for filename in glob.glob(os.path.join(dir, '*.json')):
with open(filename) as f:
yield json.loads(f.read())
for dir in dirs:
report_name = dir.rstrip('/').split('/')[-1]
do_import(report_name, instances(dir))
def jsoncsv(output, input, expand_, restore_, safe, separator):
if expand_ and restore_:
raise click.UsageError('can not choose both, default is `-e`')
func = expand
if restore_:
func = restore
for line in input:
obj = json.loads(line)
new = func(obj, separator=separator, safe=safe)
content = json.dumps(new, ensure_ascii=False).encode('utf-8')
output.write(content)
output.write('\n')
input.close()
output.close()
def parent(ctx, input, depth):
"""Takes a [x, y, z] tile as input and writes its parent to stdout
in the same form.
$ echo "[486, 332, 10]" | mercantile parent
Output:
[243, 166, 9]
"""
src = normalize_input(input)
for line in iter_lines(src):
tile = json.loads(line)[:3]
if tile[2] - depth < 0:
raise click.UsageError("Invalid parent level: {0}".format(tile[2] - depth))
for i in range(depth):
tile = mercantile.parent(tile)
output = json.dumps(tile)
click.echo(output)
def run(s, p):
if s:
server = micro_server("s1", auri=AMQ_URI)
@server.service("foobar")
def h(a):
print a, os.getpid()
return {"b": a}
server.start_service(2, daemon=False)
elif p:
if not os.path.isfile(p) or os.path.splitext(p)[1] != '.yaml':
raise click.BadParameter(
'the param must be yaml config')
w = WORK_FRAME(auri=AMQ_URI, service_group_conf=p)
w.frame_start()
else:
raise click.UsageError(
'Could not find other command. You can run kael run --help to see more information')
def stop(counters, all=False):
"""
Stop one or more in-progress executions.
"""
project = get_project(require=True)
params = {'project': project.id}
if counters and all:
raise click.UsageError('Pass either an execution # or `--all`, not both.')
elif counters:
params['counter'] = sorted(IntegerRange.parse(counters).as_set())
elif all:
params['status'] = 'incomplete'
else:
warn('Nothing to stop (pass #s or `--all`)')
return 1
for execution in request('get', '/api/v0/executions/', params=params).json()['results']:
click.echo('Stopping #{counter}... '.format(counter=execution['counter']), nl=False)
resp = request('post', execution['urls']['stop'])
click.echo(resp.text)
success('Done.')
def validate_git_specifier(refspec, branch, commit, tag):
"""
Validate that the set of specifiers given is consistent.
The set is valid if:
- only a branch is given
- only a commit is given
- only a tag is given
- a refspec and target non-master branch is given
:param refspec: provided refspec like 'pull/1/head'
:param branch: provided branch like 'master'
:param commit: provided commit SHA like '2cbd73cbd5aacc965ecfa480fa90164a85191489'
:param tag: provided tag like 'v1.3.0-rc2'
"""
if commit and (refspec or branch or tag):
raise UsageError('If a commit is specified, neither a refspec, branch, or tag can also be specified.')
if tag and (commit or refspec or branch):
raise UsageError('If a tag is specified, neither a refspec, branch, or commit can also be specified.')
if refspec and not branch:
raise UsageError('If a refspec is specified, the name of the branch to create for it is required.')
if refspec and branch == 'master':
raise UsageError('The branch specified for a refspec cannot be the master branch.')
def stop_executions(config, context_id, endpoint, all_contexts):
"""Stop running executions."""
client = from_config(config, endpoint=endpoint)
if not bool(context_id) ^ all_contexts:
raise click.UsageError(
'Either specify context id or use --all-contexts')
if context_id:
contexts = (client.contexts[cid] for cid in context_id)
else:
contexts = client.contexts
for context in contexts:
for execution in context.executions:
try:
click.echo(
'Stopping execution {0.id} on context {1.id} ... '.format(
execution, context),
nl=False)
execution.stop()
click.secho('OK', fg='green')
except errors.APIError:
click.secho('FAIL', fg='red')
def extend(from_region: str,
to_region: str,
cluster_name: str,
ring_size: int,
dc_suffix: str,
num_tokens: int,
instance_type: str,
volume_type: str,
volume_size: int,
volume_iops: int,
no_termination_protection: bool,
use_dmz: bool,
hosted_zone: str,
artifact_name: str,
docker_image: str,
environment: list,
sns_topic: str,
sns_email: str):
if from_region != to_region and not(use_dmz):
raise click.UsageError('Extending to a new region requires --use-dmz')
extend_cluster(options=locals())
def validate_artifact_version(options: dict) -> dict:
conflict_options_msg = """Conflicting options: --artifact-name and
--docker-image cannot be specified at the same time"""
if not options['docker_image']:
if not options['artifact_name']:
options['artifact_name'] = 'planb-cassandra-3.0'
image_version = get_latest_docker_image_version(options['artifact_name'])
docker_image = 'registry.opensource.zalan.do/stups/{}:{}' \
.format(options['artifact_name'], image_version)
info('Using docker image: {}'.format(docker_image))
else:
if options['artifact_name']:
raise click.UsageError(conflict_options_msg)
image_version = options['docker_image'].split(':')[-1]
docker_image = options['docker_image']
return dict(options, docker_image=docker_image, image_version=image_version)
def backport(self):
if not self.branches:
raise click.UsageError("At least one branch must be specified.")
self.fetch_upstream()
for maint_branch in self.sorted_branches:
click.echo(f"Now backporting '{self.commit_sha1}' into '{maint_branch}'")
cherry_pick_branch = self.get_cherry_pick_branch(maint_branch)
self.checkout_branch(maint_branch)
commit_message = ""
try:
self.cherry_pick()
commit_message = self.amend_commit_message(cherry_pick_branch)
except subprocess.CalledProcessError as cpe:
click.echo(cpe.output)
click.echo(self.get_exit_message(maint_branch))
except CherryPickException:
click.echo(self.get_exit_message(maint_branch))
raise
else:
if self.push:
self.push_to_remote(maint_branch,
cherry_pick_branch,
commit_message)
self.cleanup_branch(cherry_pick_branch)
else:
click.echo(\
f"""
Finished cherry-pick {self.commit_sha1} into {cherry_pick_branch} \U0001F600
--no-push option used.
... Stopping here.
To continue and push the changes:
$ cherry_picker --continue
To abort the cherry-pick and cleanup:
$ cherry_picker --abort
""")
def do_pylintcmd(load_plugins, rcfile, module, expected, pylint_options):
# import pdb; pdb.set_trace()
if not module:
module = []
candidate_addons_dirs = (
opj('odoo', 'addons'),
'odoo_addons',
'.',
)
for candidate_addons_dir in candidate_addons_dirs:
if os.path.isdir(candidate_addons_dir):
module.extend(
opj(candidate_addons_dir, addon) for addon in
get_installable_addons(candidate_addons_dir)
)
if not module:
raise click.UsageError("Please provide module or package "
"to lint (--module).")
cmd = [
'--load-plugins', load_plugins,
'--rcfile', rcfile,
] + list(pylint_options) + list(module)
log_cmd(['pylint'] + cmd, level=logging.INFO)
lint_res = pylint.lint.Run(cmd[:], exit=False)
sys.stdout.flush()
sys.stderr.flush()
expected = _consolidate_expected(rcfile, expected)
fails, no_fails = _get_failures(lint_res.linter.stats, expected)
if fails or no_fails:
msg = cmd_string(['pylint'] + cmd)
msg += '\n'
msg += _failures_to_str(fails, no_fails)
click.echo('\n')
click.echo(msg)
if fails:
raise click.ClickException("pylint errors detected.")
def remove(ctx, service_names, is_all):
"""
Remove a service from 21 sell
\b
Removing a service
$ 21 sell remove <service_name>
\b
Removing all services from 21 sell
$ 21 sell remove --all
"""
if not service_names and is_all is False:
raise click.UsageError('No service selected.', ctx=ctx)
manager = ctx.obj['manager']
logger.info(click.style("Removing services.", fg=cli_helpers.TITLE_COLOR))
def service_successfully_removed_hook(tag):
cli_helpers.print_str(tag, ["Removed"], "TRUE", True)
def service_does_not_exists_hook(tag):
cli_helpers.print_str(tag, ["Doesn't exist"], "FALSE", False)
def service_failed_to_remove_hook(tag):
cli_helpers.print_str(tag, ["Failed to remove"], "FALSE", False)
if is_all:
services_to_remove = manager.available_user_services()
else:
services_to_remove = service_names
for service_name in services_to_remove:
manager.remove_service(service_name, service_successfully_removed_hook, service_does_not_exists_hook,
service_failed_to_remove_hook)
def task_event_list(task_id, limit, filter_errors, filter_non_errors):
"""
Executor for `globus task-event-list`
"""
client = get_client()
# cannot filter by both errors and non errors
if filter_errors and filter_non_errors:
raise click.UsageError("Cannot filter by both errors and non errors")
elif filter_errors:
filter_string = "is_error:1"
elif filter_non_errors:
filter_string = "is_error:0"
else:
filter_string = ""
event_iterator = client.task_event_list(
task_id, num_results=limit, filter=filter_string)
formatted_print(event_iterator,
fields=(('Time', 'time'), ('Code', 'code'),
('Is Error', 'is_error'), ('Details', 'details')),
json_converter=iterable_response_to_dict)
def create_command(principal, permissions, endpoint_plus_path):
"""
Executor for `globus endpoint permission create`
"""
if not principal:
raise click.UsageError(
'A security principal is required for this command')
endpoint_id, path = endpoint_plus_path
principal_type, principal_val = principal
client = get_client()
if principal_type == 'identity':
principal_val = maybe_lookup_identity_id(principal_val)
if not principal_val:
raise click.UsageError(
'Identity does not exist. '
'Use --provision-identity to auto-provision an identity.')
elif principal_type == 'provision-identity':
principal_val = maybe_lookup_identity_id(principal_val, provision=True)
principal_type = 'identity'
rule_data = assemble_generic_doc(
'access', permissions=permissions, principal=principal_val,
principal_type=principal_type, path=path)
res = client.add_endpoint_acl_rule(endpoint_id, rule_data)
formatted_print(res, text_format=FORMAT_TEXT_RECORD,
fields=[('Message', 'message'), ('Rule ID', 'access_id')])
def exclusive(ctx_params, exclusive_params, error_message):
"""
Enables defining mutually exclusive options.
https://gist.github.com/thebopshoobop/51c4b6dce31017e797699030e3975dbf
:param ctx_params:
:param exclusive_params:
:param error_message:
:return:
"""
if sum([1 if ctx_params[p] else 0 for p in exclusive_params]) > 1:
raise click.UsageError(error_message)
def _globals(command):
def proxy(verbose, alwaysprompt, neverprompt, **kwargs):
# handle these global options
setverbose(verbose)
if alwaysprompt:
if neverprompt:
raise UsageError("--alwaysprompt and --neverprompt options"
" cannot be used together")
setwantprompt(PROMPT_ALWAYS)
elif neverprompt:
setwantprompt(PROMPT_NEVER)
# pass all other arguments on to the command
command(**kwargs)
proxy.__name__ = command.__name__
proxy.__doc__ = command.__doc__
proxy = option('-a', '--alwaysprompt', is_flag=True,
help="Always prompt the user to answer questions, even"
" named questions that they have answered on previous runs"
)(proxy)
proxy = option('-n', '--neverprompt', is_flag=True,
help="Never prompt the user to answer questions. Questions"
" will be answered automatically using the user's previous"
" answer or the `noprompt` value.")(proxy)
proxy = option('-v', '--verbose', 'verbose', is_flag=True,
help="Produce extra output")(proxy)
return proxy
def _check_paths(self):
if not os.path.exists(self.data_file):
raise click.UsageError("Data file not found: %s\n" % self.data_file)
if self.template_file is not None and not os.path.exists(self.template_file):
raise click.UsageError("Template file not found: %s\n" % self.template_file)
if os.path.exists(self.output_dir) and not os.access(self.output_dir, os.W_OK):
raise click.UsageError("Output is not writable: %s\n" % self.output_dir)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
def handle_parse_result(self, ctx, opts, args):
if (self.name in opts) and self.mutually_exclusive.intersection(opts):
raise click.UsageError('Illegal usage: {0}'.format(self._message))
return super(MutuallyExclusiveOption, self).handle_parse_result(ctx, opts, args)
def main(az, workflow, resume, config):
try:
c = Config(az=az, workflow=workflow, config_file=config)
except ConfigError as e:
raise click.UsageError(str(e))
p = Provisioner(c)
p.run(resume)