def check_dir(_dir):
if _dir is None:
_dir = os.getcwd()
if isfile(_dir):
click.secho(
'Error: project directory is already a file: {0}'.format(_dir),
fg='red')
exit(1)
if not exists(_dir):
try:
os.makedirs(_dir)
except OSError:
pass
return _dir
python类secho()的实例源码
def synthesize(access_key, secret_key, output_file, voice_name, voice_language,
codec, text):
"""Synthesize passed text and save it as an audio file"""
try:
ivona_api = IvonaAPI(
access_key, secret_key,
voice_name=voice_name, language=voice_language, codec=codec,
)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
with click.open_file(output_file, 'wb') as file:
ivona_api.text_to_speech(text, file)
click.secho(
"File successfully saved as '{}'".format(output_file),
fg='green',
)
def list_voices(access_key, secret_key, voice_language, voice_gender):
"""List available Ivona voices"""
try:
ivona_api = IvonaAPI(access_key, secret_key)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
click.echo("Listing available voices...")
voices_list = ivona_api.get_available_voices(
language=voice_language,
gender=voice_gender,
)
# Group voices by language
voices_dict = dict()
data = sorted(voices_list, key=lambda x: x['Language'])
for k, g in groupby(data, key=lambda x: x['Language']):
voices_dict[k] = list(g)
for ln, voices in voices_dict.items():
voice_names = [v['Name'] for v in voices]
click.echo("{}: {}".format(ln, ', '.join(voice_names)))
click.secho("All done", fg='green')
def info(ctx, spec_file):
"""
Describes the specification provided.
"""
if ctx.obj.debug:
click.secho(
"Specification: " + click.format_filename(spec_file),
fg="yellow"
)
spec = Specification(spec_file)
if 'info' in spec:
version = spec['info']['version']
title = spec['info']['title']
spec_license = spec['info']['license']['name'] or 'Unknown'
banner = f"{title} - {version}. {spec_license} licensed"
click.secho(banner, fg='green')
else:
click.secho(f"No info was found in {spec}.", fg="red")
def combine_command(ctx, spec_file):
"""
Describes the specification provided.
"""
if ctx.obj.debug:
click.secho(
"Specification: " + click.format_filename(spec_file),
fg="yellow"
)
spec = Specification(spec_file)
combined_spec_schema = spec.combine().schema()
# TODO: If -y, format as yaml
print(yaml.dump(combined_spec_schema))
def spec_command(ctx, spec_file):
"""
Describes the specification provided.
"""
if ctx.obj.debug:
click.secho(
"Specification: " + click.format_filename(spec_file),
fg="yellow"
)
spec = Specification(spec_file)
# This is britle as it assumes info fields are defined in the spec.
if 'info' in spec:
version = spec['info']['version']
title = spec['info']['title']
spec_license = spec['info']['license']['name'] or 'Unknown'
banner = f"{title} - v{version}. {spec_license} licensed"
click.secho(banner, fg='green')
else:
click.secho(f"No info was found in {spec}.", fg="red")
# TODO: Implement linting of a spec.
# TODO: implement validation of a spec
def update(taxids, conn, force_download, silent):
"""Update local UniProt database"""
if not silent:
click.secho("WARNING: Update is very time consuming and can take several "
"hours depending which organisms you are importing!", fg="yellow")
if not taxids:
click.echo("Please note that you can restrict import to organisms by "
"NCBI taxonomy IDs")
click.echo("Example (human, mouse, rat):\n")
click.secho("\tpyuniprot update --taxids 9606,10090,10116\n\n", fg="green")
if taxids:
taxids = [int(taxid.strip()) for taxid in taxids.strip().split(',') if re.search('^ *\d+ *$', taxid)]
database.update(taxids=taxids, connection=conn, force_download=force_download, silent=silent)
def _read_conf(conf, version, format):
try:
conf = parse_yaml(conf)
except:
click.secho(
'Missing configuration. Did you put a `beeper.yml` file?',
blink=True,
fg='red'
)
sys.exit(1)
conf.setdefault('language', 'python')
conf.setdefault('python', 'python')
conf.setdefault('postinstall', [])
conf.setdefault('postinstall_commands', '\n'.join(conf.get('postinstall')))
conf.setdefault('manifest', set())
conf.setdefault('current_dir', os.environ.get('WORK_DIR') or os.getcwd())
conf.setdefault('scripts', [])
conf['postbuild'] = conf['scripts']
conf['version'] = version
conf['manifest'] = set(conf['manifest'])
conf['format'] = format
return conf
def prompt_overwrite(file_name):
# Skip if file doesn't exist
if not os.path.exists(file_name):
return True
# Prompt for file overwrite if outfile already exists
fmt = "File ({}) already exists. Do you want to overwrite? (y/n): "
message = fmt.format(file_name)
click.secho(message, nl=False, fg="red")
choice = click.getchar()
click.echo()
if choice not in ("y", "Y"):
return False
return True
def cli(*args, **kwargs):
"""
CSVtoTable commandline utility.
"""
# Convert CSV file
content = convert.convert(kwargs["input_file"], **kwargs)
# Serve the temporary file in browser.
if kwargs["serve"]:
convert.serve(content)
# Write to output file
elif kwargs["output_file"]:
# Check if file can be overwrite
if (not kwargs["overwrite"] and
not prompt_overwrite(kwargs["output_file"])):
raise click.Abort()
convert.save(kwargs["output_file"], content)
click.secho("File converted successfully: {}".format(
kwargs["output_file"]), fg="green")
else:
# If its not server and output file is missing then raise error
raise click.BadOptionUsage("Missing argument \"output_file\".")
def history(config, num_pages, start, output, arena):
"""Get your game history"""
_check_creds(config)
games = []
count = 0
while count != num_pages:
config.logger.debug('Getting page %d of history', start)
if arena:
history = config.trackobot.arena_history(page=start)
else:
history = config.trackobot.history(page=start)
config.logger.debug('Extending games list')
games.extend(history['history'])
count += 1
start += 1
if start > history['meta']['total_pages']:
config.logger.info('Hit max pages on account')
break
config.logger.debug('Dumping game history to %s', output)
with open(output, 'w') as f:
json.dump(games, f)
click.secho('Wrote {} games to {}'.format(len(games), output), fg='green')
def __init__(self, *args, **kwargs):
import sys
invalid = []
unicodes = ["\u2018", "\u2019", "\u201C", "\u201D", "\u2014"]
for command in sys.argv:
if any(x in command for x in unicodes):
invalid.append(command)
if invalid:
click.secho("Error: Detected invalid character in: %s\n"
"Verify the correct quotes or dashes (ASCII) are "
"being used." %
', '.join(invalid), err=True, fg='red', bold=True)
sys.exit(-1)
super().__init__(*args, **kwargs)
# Plugin state for current deployment that will be loaded from cache.
# Used to construct the dynamic CLI.
self._plugins = None
def validate(path, level):
import qiime2.sdk
try:
artifact = qiime2.sdk.Artifact.load(path)
except Exception as e:
header = 'There was a problem loading %s as a QIIME 2 Artifact:' % path
q2cli.util.exit_with_error(e, header=header)
try:
artifact.validate(level)
except qiime2.plugin.ValidationError as e:
header = 'Artifact %s does not appear to be valid at level=%s:' % (
path, level)
with open(os.devnull, 'w') as dev_null:
q2cli.util.exit_with_error(e, header=header, file=dev_null,
suppress_footer=True)
except Exception as e:
header = ('An unexpected error has occurred while attempting to '
'validate artifact %s:' % path)
q2cli.util.exit_with_error(e, header=header)
else:
click.secho('Artifact %s appears to be valid at level=%s.'
% (path, level), fg="green")
def _echo_citations():
import q2cli.cache
click.secho('\nCitations', fg='green')
click.secho('QIIME 2 framework and command line interface', fg='cyan')
click.secho('Pending a QIIME 2 publication, please cite QIIME using the '
'original publication: '
'http://www.ncbi.nlm.nih.gov/pubmed/20383131')
plugins = q2cli.cache.CACHE.plugins
if plugins:
for name, plugin in sorted(plugins.items()):
click.secho('\n%s %s' % (name, plugin['version']), fg='cyan')
click.secho(plugin['citation_text'])
else:
click.secho('\nNo plugins are currently installed.\nYou can browse '
'the official QIIME 2 plugins at https://qiime2.org')
def exit_with_error(e, header='An error has been encountered:', file=None,
suppress_footer=False):
import sys
import traceback
import textwrap
import click
if file is None:
file = sys.stderr
footer = 'See above for debug info.'
else:
footer = 'Debug info has been saved to %s' % file.name
error = textwrap.indent(str(e), ' ')
segments = [header, error]
if not suppress_footer:
segments.append(footer)
traceback.print_exception(type(e), e, e.__traceback__, file=file)
file.write('\n')
click.secho('\n\n'.join(segments), fg='red', bold=True, err=True)
click.get_current_context().exit(1)
def init(directory: str, dsn: str):
"""
Initializes a migrations directory.
"""
try:
os.makedirs(directory)
except FileExistsError:
click.secho("Unable to make directory (it exists)!", fg='red')
if dsn is not None:
dsn = '"{}"'.format(dsn)
click.secho("Writing env.py...", fg='cyan')
(Path(directory) / "env.py").write_text(env_file.format(dsn=dsn))
click.secho("Making versions directory...", fg='cyan')
(Path(directory) / "versions").mkdir(mode=0o755)
(Path(directory) / "README").write_text("Basic asql-migrate setup.")
click.secho("Done!", fg='green')
def new(message: str):
"""
Creates a new migration file.
"""
files = _get_files()
# build the message filename
next_num = len(files) + 1
f_message = list(' '.join(message)[:32].lower().replace(" ", "_"))
filename_message = ''.join(filter(lambda c: c in string.ascii_lowercase + "_", f_message))
f_name = "{:03d}_{}.py".format(next_num, filename_message)
# format the template
formatted_file = migration_template.format(revision=next_num, message=' '.join(message))
p = migrations_dir / "versions" / f_name
p.write_text(formatted_file)
click.secho("Created new migration file {}.".format(f_name))
def validate(text, file):
"""Validate JSON input using dependencies-schema"""
content = None
if text:
print('Validating text input...')
content = text
if file:
print('Validating file input...')
content = file.read()
if content is None:
click.secho('Please give either text input or a file path. See help for more details.', fg='red')
exit(1)
try:
validate_json(content)
click.secho('Valid JSON schema!', fg='green')
except Exception as e:
click.secho('Invalid JSON schema!', fg='red')
raise e
def connect(config):
global session
# Connect to cassandra
auth_provider = None
if config.get('user'):
auth_provider = PlainTextAuthProvider(username=config.get('user'), password=config.get('password'))
cluster = Cluster(
contact_points=config.get('seeds'),
port=(int(config.get('port')) if config.get('port') else 9042),
auth_provider=auth_provider
)
try:
session = cluster.connect()
except:
click.secho("Unable to connect to Cassandra", fg='red')
sys.exit()
return session
def create_migration_table(keyspace):
session.set_keyspace(keyspace)
click.echo("Creating shift_migrations table... ", nl=False)
try:
session.execute(
"""
CREATE TABLE IF NOT EXISTS shift_migrations(
type text,
time timeuuid,
migration text,
hash text,
PRIMARY KEY (type, time)
)
WITH CLUSTERING ORDER BY(time DESC)
"""
)
click.secho('OK', fg='green', bold=True)
return (True, None)
except Exception as e:
click.secho('ERROR', fg='red', bold=True)
return (False, e)