def rm(ctx, key):
"""
Remove a key from an s3 json file.
\b
s3env prod rm API_KEY
s3env staging rm DEBUG
"""
s3_key = get_key(ctx.obj["env"])
key_as_json = json.loads(s3_key.get())
try:
del key_as_json[key]
click.echo("Key removed.")
except KeyError:
raise click.ClickException("No key set for {}".format(key))
s3_key.set(json.dumps(key_as_json, indent=4))
if not key_as_json:
click.echo("No key/values are currently set.")
else:
click.echo("Current key/values are...")
for k, v in key_as_json.items():
click.echo("{}={}".format(k, v))
python类ClickException()的实例源码
def get_help_recursive(group, ctx, commands):
"""
Returns help for arbitrarily nested subcommands of the given click.Group.
"""
try:
command_name = commands.pop(0)
group = group.get_command(ctx, command_name)
if not group:
raise click.ClickException('Invalid command: {}'.format(command_name))
except IndexError:
# end of subcommand chain
return group.get_help(ctx)
except AttributeError:
# group is actually a command with no children
return group.get_help(ctx)
return get_help_recursive(group, ctx, commands)
def get(ctx, keyword):
"""
Print a configuration setting.
\b
Example:
farmer config get api_url
"""
config = ctx.obj['config']
try:
value = operator.attrgetter(keyword)(config)
except AttributeError as exc:
raise click.ClickException(exc.message)
if isinstance(value, SCALARS):
click.echo(value)
else:
# Resolve top-most LayeredConfig config and dump it as YAML.
click.echo(dump_yaml(value))
def setup(config, account_id, pin):
"""Setup the initial config.
Your account ID and PIN numbers are needed for
authentication. They will be stored in the
ini file for later reusage.
"""
click.echo('Verifying credentials...', file=sys.stderr)
api = Api(account_id, pin)
try:
info = api.authenticate()
except Exception as e:
raise click.ClickException(str(e))
config.account_id = account_id
config.pin = pin
config.info = info
config.transaction_id = api.transaction_id
click.echo(click.style('Success!', fg='green'), file=sys.stderr)
if config.json:
print(pretty_json(status))
else:
print_vehicle_info(info)
def status(config, details):
"""Show vehicle status."""
api = api_from_config(config)
try:
info = api.status(retry=False)
except Exception:
click.echo("Auth expired. Authenticating and retrying...",
file=sys.stderr)
try:
config.info = api.authenticate()
info = api.status(retry=False)
except Exception as e:
raise click.ClickException(str(e))
if config.json:
print(pretty_json(info))
else:
print_vehicle_info(config.info)
print_status_info(info, details)
def pair(config):
"""Pair to your account.
This is still useless, but will be needed for
certain remore commands on the vehicle.
"""
api = api_from_config(config)
if not api.request_pairing():
raise click.ClickException("Unable to request pairing")
click.echo(
"A Pairing code has been requested to your phone."
"Please enter it below!")
code = click.prompt('Registration Code', type=int)
if api.confirm_pairing(code):
raise click.ClickException("Error pairing client")
click.echo(click.style("Pairing sucessfull!", fg='green'))
def run(input):
""""""
pub = os.path.basename(input.name).split('.', 1)[0]
if pub == 'rsc':
reader = RscHtmlReader()
elif pub == 'acs':
reader = AcsHtmlReader()
elif pub == 'springer':
reader = NlmXmlReader()
else:
raise click.ClickException('Invalid publisher')
doc = reader.read(input)
# Serialize all records apart from those that are just chemical names or just labels
records = [record.serialize(primitive=True) for record in doc.records]
records = [record for record in records if not record.keys() == ['names'] and not record.keys() == ['labels']]
with open('%s-out.json' % os.path.splitext(input.name)[0], 'w') as outf:
json.dump(records, outf, indent=2)
def evaluate(ctx, model, corpus, clusters):
"""Evaluate performance of POS Tagger."""
click.echo('chemdataextractor.pos.evaluate')
if corpus == 'wsj':
evaluation = wsj_evaluation
sents = list(evaluation.tagged_sents())
for i, wsj_sent in enumerate(sents):
sents[i] = [t for t in wsj_sent if not t[1] == '-NONE-']
elif corpus == 'genia':
evaluation = genia_evaluation
sents = list(evaluation.tagged_sents())
# Translate GENIA bracket tags
for i, genia_sent in enumerate(sents):
for j, (token, tag) in enumerate(genia_sent):
if tag == '(':
sents[i][j] = (token, '-LRB-')
elif tag == ')':
sents[i][j] = (token, '-RRB-')
else:
raise click.ClickException('Invalid corpus')
tagger = ChemCrfPosTagger(model=model, clusters=clusters)
accuracy = tagger.evaluate(sents)
click.echo('%s on %s: %s' % (model, evaluation, accuracy))
def evaluate_perceptron(ctx, model, corpus):
"""Evaluate performance of Averaged Perceptron POS Tagger."""
click.echo('chemdataextractor.pos.evaluate')
if corpus == 'wsj':
evaluation = wsj_evaluation
sents = list(evaluation.tagged_sents())
for i, wsj_sent in enumerate(sents):
sents[i] = [t for t in wsj_sent if not t[1] == u'-NONE-']
elif corpus == 'genia':
evaluation = genia_evaluation
sents = list(evaluation.tagged_sents())
# Translate GENIA bracket tags
for i, genia_sent in enumerate(sents):
for j, (token, tag) in enumerate(genia_sent):
if tag == u'(':
sents[i][j] = (token, u'-LRB-')
elif tag == u')':
sents[i][j] = (token, u'-RRB-')
else:
raise click.ClickException('Invalid corpus')
tagger = ChemApPosTagger(model=model)
accuracy = tagger.evaluate(sents)
click.echo('%s on %s: %s' % (model, evaluation, accuracy))
def synthesize(access_key, secret_key, output_file, voice_name, voice_language,
codec, text):
"""Synthesize passed text and save it as an audio file"""
try:
ivona_api = IvonaAPI(
access_key, secret_key,
voice_name=voice_name, language=voice_language, codec=codec,
)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
with click.open_file(output_file, 'wb') as file:
ivona_api.text_to_speech(text, file)
click.secho(
"File successfully saved as '{}'".format(output_file),
fg='green',
)
def list_voices(access_key, secret_key, voice_language, voice_gender):
"""List available Ivona voices"""
try:
ivona_api = IvonaAPI(access_key, secret_key)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
click.echo("Listing available voices...")
voices_list = ivona_api.get_available_voices(
language=voice_language,
gender=voice_gender,
)
# Group voices by language
voices_dict = dict()
data = sorted(voices_list, key=lambda x: x['Language'])
for k, g in groupby(data, key=lambda x: x['Language']):
voices_dict[k] = list(g)
for ln, voices in voices_dict.items():
voice_names = [v['Name'] for v in voices]
click.echo("{}: {}".format(ln, ', '.join(voice_names)))
click.secho("All done", fg='green')
def cli(target, name):
"""
Generate man pages for the scripts defined in the ``console_acripts`` entry point.
The cli application is gathered from entry points of installed packages.
The generated man pages are written to files in the directory given
by ``--target``.
"""
console_scripts = [ep for ep in iter_entry_points('console_scripts', name=name)]
if len(console_scripts) < 1:
raise click.ClickException('"{0}" is not an installed console script.'.format(name))
# Only generate man pages for first console script
entry_point = console_scripts[0]
# create target directory if it does not exist yet
try:
os.makedirs(target)
except OSError:
pass
click.echo('Load entry point {0}'.format(name))
cli = entry_point.resolve()
click.echo('Generate man pages for {0} in {1}'.format(name, target))
write_man_pages(name, cli, version=entry_point.dist.version, target_dir=target)
def do_tag(config, force, src, requirement, yes):
tag = config.version
if not yes:
click.confirm('Tag project with {}?'.format(tag), abort=True)
if force:
force_cmd = ['-f']
else:
force_cmd = []
if call(['git', 'diff', '--exit-code']) != 0:
raise click.ClickException("Please commit first.")
if call(['git', 'diff', '--exit-code', '--cached']) != 0:
raise click.ClickException("Please commit first.")
out = check_output(['git', 'ls-files', '--other', '--exclude-standard',
'--directory'])
if out:
click.echo(out)
raise click.ClickException("Please commit first.")
do_tag_requirements(config, force, src, requirement, yes=True)
check_call(['git', 'tag'] + force_cmd + [tag])
check_call(['git', 'push', '-q'] + force_cmd + ['origin', 'tag', tag])
def add_monitor(ctx, name, uri, location, frequency, email, validation_string,
bypass_head_request, verify_ssl, redirect_is_failure, sla_threshold, raw):
if validation_string:
# We must bypass head request we're to validate string.
bypass_head_request = True
with Spinner('Creating monitor: ', remove_message=raw):
status, message, monitor = newrelic.create_monitor(
ctx.obj['ACCOUNT'], name, uri, frequency, location, email,
validation_string, bypass_head_request, verify_ssl, redirect_is_failure,
sla_threshold)
if raw:
print(json.dumps(monitor))
return
if status == 0:
print(click.style(u'OK', fg='green', bold=True))
print('Monitor: ' + message)
else:
print(click.style(u'Error', fg='red', bold=True))
raise click.ClickException(message)
def cli(ctx, email, password, account, environment):
cookiejar = os.path.expanduser('~/.config/neres/{}.cookies'.format(environment))
if not os.path.exists(os.path.dirname(cookiejar)):
os.makedirs(os.path.dirname(cookiejar), 0o700)
newrelic.initialize_cookiejar(cookiejar)
if ctx.invoked_subcommand != 'login':
with Spinner('Authorizing: '):
if all([email, password]):
newrelic.login(email, password)
else:
if not newrelic.check_if_logged_in():
raise click.ClickException('Login first')
if not account and ctx.invoked_subcommand != 'list-accounts':
account = newrelic.get_accounts()[0]['id']
ctx.obj = {}
ctx.obj['ACCOUNT'] = account
ctx.obj['EMAIL'] = email
ctx.obj['PASSWORD'] = password
def get_or_create_wallet(wallet_path):
"""Create a new wallet or return the currently existing one."""
data_provider = twentyone_provider.TwentyOneProvider(two1.TWO1_PROVIDER_HOST)
if wallet.Two1Wallet.check_wallet_file(wallet_path):
return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider)
# configure wallet with default options
click.pause(uxstring.UxString.create_wallet)
wallet_options = dict(data_provider=data_provider, wallet_path=wallet_path)
if not wallet.Two1Wallet.configure(wallet_options):
raise click.ClickException(uxstring.UxString.Error.create_wallet_failed)
# Display the wallet mnemonic and tell user to back it up.
# Read the wallet JSON file and extract it.
with open(wallet_path, 'r') as f:
wallet_config = json.load(f)
mnemonic = wallet_config['master_seed']
click.pause(uxstring.UxString.create_wallet_done % click.style(mnemonic, fg='green'))
if wallet.Two1Wallet.check_wallet_file(wallet_path):
return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider)
def test_parse_post_data():
"""Test utility functionality for parsing data."""
form_url = 'type=test&message=hey hey hey!'
json_data = '{"type": "test", "message": "hey hey hey!"}'
invalid = 'type: test; message: hey hey hey!'
data_str, content_type = buy._parse_post_data(form_url)
assert isinstance(data_str, str)
assert 'type=test' in data_str
assert 'message=hey hey hey!' in data_str
assert content_type == 'application/x-www-form-urlencoded'
data_str, content_type = buy._parse_post_data(json_data)
data_dict = json.loads(data_str)
assert isinstance(data_str, str)
assert data_dict['type'] == 'test'
assert data_dict['message'] == 'hey hey hey!'
assert content_type == 'application/json'
with pytest.raises(click.ClickException) as e:
data_dict, content_type = buy._parse_post_data(invalid)
assert str(e.value) == uxstring.UxString.buy_bad_data_format
def url(ctx, stage):
# type: (click.Context, str) -> None
factory = ctx.obj['factory'] # type: CLIFactory
config = factory.create_config_obj(stage)
deployed = config.deployed_resources(stage)
if deployed is not None:
click.echo(
"https://{api_id}.execute-api.{region}.amazonaws.com/{stage}/"
.format(api_id=deployed.rest_api_id,
region=deployed.region,
stage=deployed.api_gateway_stage)
)
else:
e = click.ClickException(
"Could not find a record of deployed values to chalice stage: '%s'"
% stage)
e.exit_code = 2
raise e
def resolve_project_id(client_secrets, credentials):
"""Resolve project ID from client secrets."""
if client_secrets is None:
client_secrets = 'client_secret_%s.json' % credentials.client_id
try:
with open(client_secrets, 'r') as f:
secret = json.load(f)
return secret['installed']['project_id']
except Exception as e:
raise click.ClickException('Error loading client secret: %s.\n'
'Run the device tool '
'with --client-secrets '
'or --project-id option.\n'
'Or copy the %s file '
'in the current directory.'
% (e, client_secrets))
def cli(ctx, project_id, client_secrets, verbose, api_endpoint, credentials):
try:
with open(credentials, 'r') as f:
c = google.oauth2.credentials.Credentials(token=None,
**json.load(f))
http_request = google.auth.transport.requests.Request()
c.refresh(http_request)
except Exception as e:
raise click.ClickException('Error loading credentials: %s.\n'
'Run google-oauthlib-tool to initialize '
'new OAuth 2.0 credentials.' % e)
ctx.obj['API_ENDPOINT'] = api_endpoint
ctx.obj['API_VERSION'] = ASSISTANT_API_VERSION
ctx.obj['SESSION'] = None
ctx.obj['PROJECT_ID'] = project_id
ctx.obj['CREDENTIALS'] = c
ctx.obj['CLIENT_SECRETS'] = client_secrets
logging.basicConfig(format='',
level=logging.DEBUG if verbose else logging.INFO)
def config(ctx, use_global, set_device, set_app, clear, show):
"""Configurates adbons."""
if set_device:
Config.write_value(use_global, Config.SECTION_DEVICE,
Config.KEY_DEFAULT, set_device)
if set_app:
Config.write_value(use_global, Config.SECTION_APP,
Config.KEY_DEFAULT, set_app)
if clear:
Config.clear_value(use_global, clear, Config.KEY_DEFAULT)
if show:
config = Config.read_values(use_global)
if config is None:
raise click.ClickException("No config file found.")
else:
click.echo("%s config:" % ("Global" if use_global else "Local"))
for key, value in config.items():
if key is not None and value is not None:
default = "not set"
if bool(value) and bool(value[Config.KEY_DEFAULT]):
default = value[Config.KEY_DEFAULT]
click.echo("The default " + key + " is " + default)
def notify(ctx, provider):
"""Send a notification to a passed provider.
Data should be passed via a key=value input like so:
notifiers notify pushover token=foo user=bar message=test
"""
p = get_notifier(provider)
data = {}
for item in ctx.args:
data.update([item.split('=')])
if 'message' not in data:
message = click.get_text_stream('stdin').read()
if not message:
raise click.ClickException(
"'message' option is required. "
"Either pass it explicitly or pipe into the command"
)
data['message'] = message
rsp = p.notify(**data)
rsp.raise_on_errors()
def worker(**options):
"Run background worker instance."
from django.conf import settings
if hasattr(settings, 'CELERY_ALWAYS_EAGER') and \
settings.CELERY_ALWAYS_EAGER:
raise click.ClickException(
'Disable CELERY_ALWAYS_EAGER in your '
'settings file to spawn workers.')
from munch.core.celery import app
os.environ['WORKER_TYPE'] = ','.join(options.pop('worker_type')).lower()
pool_cls = options.pop('pool')
worker = app.Worker(
pool_cls=pool_cls, queues=settings.CELERY_DEFAULT_QUEUE, **options)
worker.start()
try:
sys.exit(worker.exitcode)
except AttributeError:
# `worker.exitcode` was added in a newer version of Celery:
# https://github.com/celery/celery/commit/dc28e8a5
# so this is an attempt to be forwards compatible
pass
def main(out_format, no_flip, clean, long, input, output):
"""
AWS CloudFormation Template Flip is a tool that converts
AWS CloudFormation templates between JSON and YAML formats,
making use of the YAML format's short function syntax where possible."
"""
try:
output.write(flip(
input.read(),
out_format=out_format,
clean_up=clean,
no_flip=no_flip,
long_form=long,
))
except Exception as e:
raise click.ClickException("{}".format(e))
def run():
global creatorQueue, tosQueue, verifierQueue, logQueue, requestSleepTimer, requestSleepTimerB
creatorQueue = Queue.Queue()
tosQueue = Queue.Queue()
verifierQueue = Queue.Queue()
logQueue = Queue.Queue()
requestSleepTimer = 0.1
requestSleepTimerB = 0.1
try:
main(standalone_mode=False)
except (EOFError, KeyboardInterrupt):
raise click.Abort()
except click.ClickException as e:
e.show()
sys.exit(e.exit_code)
except click.Abort:
global accountStore
accountStore.done()
click.echo('Aborted!', file=sys.stderr)
sys.exit(1)
sys.exit(0)
def test_create_service_404_error(self, _get_service):
client = MagicMock()
organisation_id = 'organisation-id'
name = 'service-name'
location = 'https://example.com'
service_type = 'external'
client.accounts.organisations = {'organisation-id': MagicMock()}
client.accounts.organisations[organisation_id].services.post.side_effect = HTTPError(404, 'Not Found')
with pytest.raises(click.ClickException) as exc:
_create_service(client, organisation_id, name, location, service_type)
client.accounts.organisations[organisation_id].services.post.assert_called_once_with(
name='service-name', location='https://example.com', service_type='external')
assert not _get_service.called
assert exc.value.message == ('\x1b[31mOrganisation organisation-id cannot be found. '
'Please check organisation_id.\x1b[0m')
def _get_service(client, organisation_id, name):
"""
Get service belonging to organisation which matches given service name
:param client: Accounts Service API Client
:param organisation_id: Id of Organisation
:param name: Service Name
:return: Service Id
"""
try:
response = client.accounts.services.get(organisation_id=organisation_id)
except httpclient.HTTPError as exc:
if exc.code == 404:
# If error is a 404 then this means the organisation_id is not recognised. Raise this error immediately
msg = ('Organisation {} cannot be found. '
'Please check organisation_id.'.format(organisation_id))
raise click.ClickException(click.style(msg, fg='red'))
else:
raise exc
services = [s for s in response['data'] if s['name'] == name]
if services:
return services[0]['id']
return None
def _get_client_secret(client, service_id):
"""
Get client secret for service
:param client: Accounts Service API Client
:param service_id: Service ID
:return: Client secret (if available)
"""
try:
response = client.accounts.services[service_id].secrets.get()
except httpclient.HTTPError as exc:
if exc.code == 404:
# If error is a 404 then this means the service_id is not recognised. Raise this error immediately
msg = ('Service {} cannot be found.'.format(service_id))
raise click.ClickException(click.style(msg, fg='red'))
else:
raise exc
client_secrets = response['data']
if client_secrets:
return client_secrets[0]
return None
def get(ctx, module_id):
"""Get an nFlex module.
If the module type is inline and the source_code is more than 50
characters long, the source_code field won't be displayed.
"""
try:
result = ctx.nflex.get(module_id)
if len(result["source_code"]) > 50:
result["source_code"] = "*** REDACTED ***"
print_module(result)
except requests.exceptions.RequestException as e:
raise click.ClickException(
"Failed to delete nFlex module: %s" % prep_err_msg(e)
)
def download(ctx, module_id):
"""Download an nFlex module.
If the module type is inline, the source_code will be saved as "main.py"
in the current working directory, otherwise the contents of the zip file
will be extracted there.
"""
try:
ctx.nflex.download(module_id)
click.echo('Module %s downloaded in the current directory' % module_id,
err=True)
except requests.exceptions.RequestException as e:
raise click.ClickException(
"Failed to download nFlex module: %s" % prep_err_msg(e)
)