def command(dir_in, recursive):
files_out = []
if recursive:
for root, dirs, files in os.walk(os.path.abspath(dir_in)):
for f in files:
files_out.append(cwl_file(os.path.join(root, f)))
else:
for f in os.listdir(dir_in):
fi = os.path.join(dir_in, f)
if os.path.isfile(fi):
files_out.append(cwl_file(fi))
# order alphabetically on file name
files_out = sorted(files_out, key=lambda x: x.get('path'))
stdout_text = click.get_text_stream('stdout')
stdout_text.write(json.dumps({'out_files': files_out}))
python类get_text_stream()的实例源码
def notify(ctx, provider):
"""Send a notification to a passed provider.
Data should be passed via a key=value input like so:
notifiers notify pushover token=foo user=bar message=test
"""
p = get_notifier(provider)
data = {}
for item in ctx.args:
data.update([item.split('=')])
if 'message' not in data:
message = click.get_text_stream('stdin').read()
if not message:
raise click.ClickException(
"'message' option is required. "
"Either pass it explicitly or pipe into the command"
)
data['message'] = message
rsp = p.notify(**data)
rsp.raise_on_errors()
def validate_text(ctx, param, value):
conf = click.get_current_context().obj["conf"]
if isinstance(value, tuple):
value = " ".join(value)
if not value and not sys.stdin.isatty():
value = click.get_text_stream("stdin").read()
if value:
value = value.strip()
if conf.character_warning and len(value) > conf.character_warning:
click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format(
conf.character_warning), abort=True)
return value
else:
raise click.BadArgumentUsage("Text can’t be empty.")
def staging():
stdin_text = click.get_text_stream('stdin')
for line in stdin_text:
print(line)
# registers a new username
def download_classifications(
workflow_id,
output_file,
generate,
generate_timeout
):
"""
Downloads a workflow-specific classifications export for the given workflow.
OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to -
to output to stdout.
"""
workflow = Workflow.find(workflow_id)
if generate:
click.echo("Generating new export...", err=True)
export = workflow.get_export(
'classifications',
generate=generate,
wait_timeout=generate_timeout
)
with click.progressbar(
export.iter_content(chunk_size=1024),
label='Downloading',
length=(int(export.headers.get('content-length')) / 1024 + 1),
file=click.get_text_stream('stderr'),
) as chunks:
for chunk in chunks:
output_file.write(chunk)
def download(project_id, output_file, generate, generate_timeout, data_type):
"""
Downloads project-level data exports.
OUTPUT_FILE will be overwritten if it already exists. Set OUTPUT_FILE to -
to output to stdout.
"""
project = Project.find(project_id)
if generate:
click.echo("Generating new export...", err=True)
export = project.get_export(
data_type,
generate=generate,
wait_timeout=generate_timeout
)
with click.progressbar(
export.iter_content(chunk_size=1024),
label='Downloading',
length=(int(export.headers.get('content-length')) / 1024 + 1),
file=click.get_text_stream('stderr'),
) as chunks:
for chunk in chunks:
output_file.write(chunk)
def lowercase(in_file, out_dir):
create_dirs(out_dir)
text = in_file.read()
text = text.lower()
stdout_text = click.get_text_stream('stdout')
stdout_text.write(text)
def echo_stdout():
stdout = click.get_text_stream('stdout')
if stdout.readable():
stdout.read()
def _stdin_user_credentials():
stdin = click.get_text_stream('stdin').read()
stdin_lines = stdin.strip().splitlines()
try:
username, password = stdin_lines[:2]
except ValueError:
raise click.ClickException("Failed to read newline separated "
"username and password from stdin.")
return username, password
def write_csv(rows, delim):
writer = csv.writer(click.get_text_stream('stdout'), delimiter=delim, lineterminator='\n')
try:
[writer.writerow(row) for row in rows]
except (OSError, IOError):
sys.stderr.close()
def cli(ctx, message, key, config, verbose):
"""Use simplepush and tasker to push text to remote phone's clipboard."""
if ctx.invoked_subcommand is None:
if not os.path.exists(config):
click.echo("Config file: {}".format(config))
click.echo("Config file does not exist. Please choose a different file, or initialize with push2clip init <key>")
quit()
with open(config, 'r') as f:
conf = json.load(f)
f.close()
if not key:
if not conf['key']:
print("Error: Simplepush key is missing. Unable to send push.")
quit()
key = conf['key']
if not message:
message = click.get_text_stream('stdin').read()
event = conf.get("event", '')
title = conf.get("title", '')
sp = simplepush(key, event=event, title=title, message=message)
if not sp:
click.echo("An error was encountered while pushing message.")
else:
if verbose:
click.echo("Push sent successfully.")
click.echo("Key: {}".format(key))
click.echo("Event: {}".format(event))
click.echo("Title: {}".format(title))
click.echo("Message: {}".format(message))
def cli(ctx, config):
csv_config = {}
try:
with open(config, 'r') as config_file:
csv_config = json.load(config_file)
l.INFO("Using custom CSV configuration: %s" % (csv_config))
except TypeError:
l.WARN("Using default CSV configuration: %s" % (CSV_DEFAULT_CONFIG))
input_ = click.get_text_stream('stdin')
convert(input_, configuration=csv_config)
def cli(ctx, config):
s3_config = {}
try:
with open(config, 'r') as config_file:
s3_config = json.load(config_file)
l.INFO("Using custom CSV configuration: %s" % (s3_config))
except TypeError:
l.WARN("Unable to parse s3 config")
input_ = click.get_text_stream('stdin')
convert(input_, configuration=s3_config)
def cli(ctx, config):
client = MongoClient('localhost', 27017)
db = client.twitter_connections
collection = db.relationships
input_ = click.get_text_stream('stdin')
insert_lines(collection, input_)
def human(verbosity, datefmt, no_color):
'''Use this command to format machine-readable logs for humans.
`human` reads stdin and writes formatted lines to stdout.
Verbosity Levels:
FATAL=100 | ERROR=200 | WARN=300 | INFO=400 | DEBUG=500 |
TRACE=600 (DEFAULT)
USAGE:
tail -f myapp.log | human
cat myapp.log | human -v 400 --datefmt "%Y-%m-%d %H:%M:%S"
'''
f = CosmologgerHumanFormatter(origin='todo',
version=0,
datefmt=datefmt,
color=not no_color)
with click.get_text_stream('stdin') as stdin:
for line in stdin:
line = line.strip()
try:
process(line, verbosity, f)
except CosmologgerException as e:
msg = _format_exception(line, e, no_color)
click.echo(msg, err=True)
def main(args=None):
"""
Console script for renodiff
Example usage:
git show | renodiff
"""
stdin_text = click.get_text_stream('stdin')
try:
run(stdin_text.read())
return 0
except:
return -1
def main(
telegram_token: str,
ns_login: str,
ns_password: str,
log_file: click.File,
verbose: bool,
):
logging.basicConfig(
datefmt="%Y-%m-%d %H:%M:%S",
format="%(asctime)s [%(levelname).1s] %(message)s",
level=(logging.INFO if not verbose else logging.DEBUG),
stream=(log_file or click.get_text_stream("stderr")),
)
logging.info("Starting bot…")
with ExitStack() as exit_stack:
telegram = exit_stack.enter_context(closing(Telegram(telegram_token)))
ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password)))
bot = exit_stack.enter_context(closing(Bot(telegram, ns)))
try:
asyncio.ensure_future(bot.run())
asyncio.get_event_loop().run_forever()
finally:
bot.stop()
# Bot response phrases.
# ----------------------------------------------------------------------------------------------------------------------
def ner(**kwargs):
kwargs["no_standardize"] = not kwargs["no_standardize"]
kwargs["no_convert_ions"] = not kwargs["no_convert_ions"]
kwargs["no_header"] = not kwargs["no_header"]
kwargs["no_normalize_text"] = not kwargs["no_normalize_text"]
kwargs["no_annotation"] = not kwargs["no_annotation"]
is_output_file = bool(kwargs["output"])
stdin = click.get_text_stream("stdin")
input_text = ""
if not stdin.isatty():
kwargs["input_file"] = ""
input_text = click.get_text_stream("stdin").read().strip()
if not input_text and not kwargs["input_file"]:
raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.")
kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"])
init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT)
process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS)
chemspot = ChemSpot(**init_kwargs)
result = chemspot.process(input_text=input_text, **process_kwargs)
if kwargs["dry_run"]:
print(result)
exit(0)
if kwargs["raw_output"]:
print(result["stdout"])
eprint(result["stderr"])
exit(0)
if not is_output_file:
print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def convert(**kwargs):
kwargs["no_header"] = not kwargs["no_header"]
kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"]
kwargs["no_standardize"] = not kwargs["no_standardize"]
kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"]
kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"]
kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"]
kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"]
is_output_file = bool(kwargs["output"])
stdin = click.get_text_stream("stdin")
input_text = ""
if not stdin.isatty():
kwargs["input_file"] = ""
input_text = click.get_text_stream("stdin").read().strip()
if not input_text and not kwargs["input_file"]:
raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.")
init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT)
process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS)
opsin = OPSIN(**init_kwargs)
result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs)
if kwargs["dry_run"]:
print(result)
exit(0)
if kwargs["raw_output"]:
print(result["stdout"])
eprint(result["stderr"])
exit(0)
if not is_output_file:
print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
def get_stream(name, file_type):
stream = None
stream_type = get_io_type(file_type)
if stream_type == "string":
stream = click.get_text_stream(name)
else:
stream = click.get_binary_stream(name)
return stream
def append(ctx, docid, password, content):
"""
Append string to a document
Trys to find the object, (decrypt,) adds content to the bottom,
(encrypt,) update date fields and regenerates the title.
:docid: str (bson object)
:returns: bool
"""
coll = db.get_document_collection(ctx)
doc, docid = db.get_document_by_id(ctx, docid)
template, c = db.get_content(ctx, doc, password=password)
d = datetime.datetime.now()
if not content:
content = ""
for l in click.get_text_stream('stdin'):
content = content + l
content = template + content
if isinstance(content, unicode):
content = content.encode("utf-8")
if doc["encrypted"] is True:
title = utils.get_title_from_content(content)
content = c.encrypt_content(content)
else:
if not "links" in doc["categories"]:
title = utils.get_title_from_content(content)
if content != template:
doc["content"] = content
doc["title"] = title
doc["updated"] = d
if validate(doc):
coll.save(doc)
transaction.log(ctx, docid, "append", title)
utils.log_info("Content appended to \"%s\"." % title)
else:
utils.log_error("Validation of the updated object did not succeed")
else:
utils.log_info("No changes detected for \"%s\"" % title)
return True
def import_mail(ctx, tag, category):
content = ""
for l in click.get_text_stream('stdin'):
content = content + l
msg = email.message_from_string(content)
# title
subject, encoding = email.header.decode_header(msg['Subject'])[0]
if encoding is None:
encoding = "utf-8"
title = subject.decode(encoding)
# content
content = msg.get_payload(decode=False)
content = quopri.decodestring(content)
content = "# " + title + '\n\n' + content
date = datetime.datetime.now()
coll = db.get_document_collection(ctx)
config = ctx.obj["config"]
item = {
"title": title,
"content": content,
"tags": list(tag),
"categories": list(category),
"created": date,
"updated": date,
"encrypted": False,
}
# insert item if its valid
if validate(item):
coll = db.get_document_collection(ctx)
docid = coll.insert_one(item).inserted_id
transaction.log(ctx, str(docid), "import", title)
utils.log_info("Document \"%s\" created." % title)
else:
utils.log_error("Validation of the updated object did not succeed")