def output(python_object, format="raw", pager=False):
if format == 'yaml':
output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True)
elif format == 'json':
output_string = json.dumps(python_object, sort_keys=4, indent=4)
elif format == 'raw':
output_string = str(python_object)
elif format == 'pformat':
output_string = pprint.pformat(python_object)
else:
raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'")
if pager:
click.echo_via_pager(output_string)
else:
click.echo(output_string)
python类echo_via_pager()的实例源码
def write(self, data):
if isinstance(data, six.binary_type):
data = data.decode('utf-8')
# echo_via_pager() already appends a '\n' at the end of text,
# so we use rstrip() to remove extra newlines (#89)
click.echo_via_pager(data.rstrip())
def cli(sort):
""" Lists all available problems. """
problems = sorted(data.problems, key=lambda problem: problem[sort.lower()])
problem_list = ((problem['id'], problem['name'],
'%d%%' % problem['difficulty']) for problem in problems)
table = tabulate(problem_list, TABLE_HEADERS, tablefmt='fancy_grid')
click.echo_via_pager(table)
def config_list():
lines = []
for option in config:
value = config.get(option)
lines.append('{} = {}'.format(option, value))
output = '\n'.join(lines)
click.echo_via_pager(output)
def _show_file(file_path, header_name='Shown'):
if file_path.startswith('file:/'):
file_path = file_path[6:]
click.secho("%s file: %s" % (header_name, file_path), fg='blue')
if not os.path.exists(file_path):
click.secho(' The file does not exist', fg='yellow')
else:
with open(file_path) as fd:
click.echo_via_pager(fd.read())
def log(ctx, execution_id, client):
"""Get execution log (plain text from ansible-playbook) for a certain
execution."""
response = client.get_execution_log(execution_id,
headers={"Content-Type": "text/plain"},
raw_response=True, stream=True)
if ctx.obj["pager"]:
click.echo_via_pager(response.text)
else:
for line in response.iter_lines(decode_unicode=True):
click.echo(line)
def format_output_json(ctx, response, error=False):
response = json_dumps(response)
response = colorize(response, ctx.obj["color"], "json")
if error:
click.echo(response, err=True)
elif ctx.obj["pager"]:
click.echo_via_pager(response)
else:
click.echo(response)
def build_table(self, view_entries, limit, pager, format_method,
build_urls=True, print_output=True):
"""Build the table used for the gh view command.
:type view_entries: list
:param view_entries: A list of `github3` items.
:type limit: int
:param limit: Determines the number of items to show.
:type format_method: callable
:param format_method: A method called to format each item in the table.
:type build_urls: bool
:param build_urls: Determines whether to build urls for the
gh view # command.
:type print_output: bool
:param print_output: determines whether to print the output
(True) or return the output as a string (False).
:rtype: str
:return: the output if print_output is True, else, return None.
"""
if build_urls:
self.build_table_urls(view_entries)
index = 0
output = ''
for view_entry in view_entries:
index += 1
view_entry.index = index
output += format_method(view_entry) + '\n'
if index >= limit:
break
if build_urls:
if len(view_entries) > limit:
output += click.style((' <Hiding ' +
str(len(view_entries) - limit) +
' item(s) with the -l/--limit flag>\n'),
fg=self.config.clr_message)
if index == 0:
output += click.style('No results found',
fg=self.config.clr_message)
elif build_urls:
output += click.style(self.create_tip(index))
else:
output += click.style('')
if print_output:
if pager:
color = None
if platform.system() == 'Windows':
color = True
# Strip out Unicode, which seems to have issues on
# Windows with click.echo_via_pager.
output = re.sub(r'[^\x00-\x7F]+', '', output)
click.echo_via_pager(output, color)
else:
click.secho(output)
return None
else:
return output
def generate_url_contents(self, url):
"""Generate the formatted contents of the given item's url.
Converts the HTML to text using HTML2Text, colors it, then displays
the output in a pager.
:type url: str
:param url: The url whose contents to fetch.
:rtype: str
:return: The string representation of the formatted url contents.
"""
try:
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} # NOQA
raw_response = requests.get(url, headers=headers,
verify=self.config.verify_ssl)
except (requests.exceptions.SSLError,
requests.exceptions.ConnectionError) as e:
contents = 'Error: ' + str(e) + '\n'
contents += 'Try running gh view # with the --browser/-b flag\n'
return contents
contents = self.html_to_text.handle(raw_response.text)
# Strip out Unicode, which seems to have issues when html2txt is
# coupled with click.echo_via_pager.
contents = re.sub(r'[^\x00-\x7F]+', '', contents)
contents = self.format_markdown(contents)
return contents
def view_url(self, url):
"""View the given url.
:type index: int
:param index: The index for the given item, used with the
gh view [index] commend.
:type url: str
:param url: The url to view
"""
contents = self.generate_url_contents(url)
header = click.style('Viewing ' + url + '\n\n',
fg=self.config.clr_message)
contents = header + contents
contents += click.style(('\nView this article in a browser with'
' the -b/--browser flag.\n'),
fg=self.config.clr_message)
contents += click.style(('\nPress q to quit viewing this '
'article.\n'),
fg=self.config.clr_message)
if contents == '{"error":"Not Found"}\n':
click.secho('Invalid user/repo combination.')
return
color = None
if platform.system() == 'Windows':
color = True
click.echo_via_pager(contents, color)
def delete(config, a, se):
"""
Deletes all the directory content (files, dirs)
"""
if a and click.confirm("Delete all contents of " + config.dir_to_use + " ?"):
click.echo("Attempting to delete: " + str(analyzer.get_entries_count()) + " entries...\n")
cleaner.delete_dir_content(config.dir_to_use)
filemanager.write_cleanup_report(cleaner.cleanup_data, config.app_dir)
filemanager.pickle_data("last-cleanup", cleaner.cleanup_data, config.app_dir) # Make clean up data persistent
click.echo("\nDeletion complete!")
click.echo("* Deletions: " + str(cleaner.cleanup_data["deletions"]))
click.echo("* Deletion size: " + converter.human_readable_size(cleaner.cleanup_data["size"]))
click.echo("* Errors: " + str(cleaner.cleanup_data["error_count"]))
if se:
try:
last_cleanup = filemanager.unpickle_data("last-cleanup")
click.echo("Errors encountered during the last deletion [" + last_cleanup["datetime"] + "]:")
click.echo("Total: " + str(last_cleanup["error_count"]) + "\n")
click.echo_via_pager("\n\n".join("* %s" % error
for error in last_cleanup["errors"]))
except FileNotFoundError:
click.echo("No error data was found.")
def get_news_articles(source):
data = requests.get(
"https://newsapi.org/v1/articles?source=" + source + "&apiKey=ba4d03d4a6f84f10962a79fd977e43b2")
if data.status_code == 200:
news_json = data.json()
articles = news_json["articles"]
for article in articles:
author = article['author']
title = article['title']
description = article['description']
click.echo_via_pager(" TITLE: {} \n DESCRIPTION: {} \n AUTHOR: {}".format(title, description, author))
click.secho("=" * 130, fg='blue')
else:
click.echo("An error occurred try again later")
def readme():
"""
View the README
"""
click.echo_via_pager(
highlight(
pkg_resources.resource_string("cuv", "README.rst"),
get_lexer_by_name('rst'),
formatter=TerminalFormatter(),
)
)
def __exit__(self, a, b, c):
msg = ''.join(self._lines)
click.echo_via_pager(msg, color=True)
def paged_echo():
"""
Unfortunately, to use .echo_via_pager() in Click, you have to feed
it all of the lines at once.
This returns a context-manager that lets you incrementally call
'.echo' (same as 'click.echo') incrementally, only outputting (to
the pager) when the context closes.
"""
return _PagedEcho()
def _echo(output, min_lines=10):
ctx = click.get_current_context()
if ctx.obj.get('use_pager') and output.count('\n') > min_lines:
_func = click.echo_via_pager
else:
_func = click.echo
_func(output, sys.stdout)
def pager(self, text, end=None):
return echo_via_pager(text)
def readme(packages):
"""
View packages' long descriptions.
If stdout is a terminal, the descriptions are passed to a pager program
(e.g., `less(1)`).
Packages can be specified as either ``packagename`` to show the latest
version or as ``packagename==version`` to show the long description for
``version``.
"""
for pkg in packages:
click.echo_via_pager(pkg["info"]["description"])
def list(ctx, **kwargs):
jobs = JobAPI(ecx_session=ctx.ecx_session).list()
if ctx.json:
ctx.print_response(jobs)
return
job_table_info = [(x['name'], x['id'], x['status'], format_last_run_time(x['lastRunTime'])) for x in jobs]
if not job_table_info:
return
print
click.echo_via_pager(tabulate(job_table_info, headers=["Name","ID", "Status", "Last run"]))
print
def list(ctx, **kwargs):
resp = ctx.ecx_session.get(restype=ctx.restype, endpoint=ctx.endpoint)
list_field = kwargs.get('listfield') or resource_to_listfield.get(ctx.restype) or (ctx.restype + 's')
if ctx.json or list_field not in resp:
ctx.print_response(resp)
return
resources = resp[list_field]
if kwargs['fields']:
fields = [x.strip() for x in kwargs['fields'].split(',')]
else:
fields = ["name", "id"]
table_data = []
for res in resources:
row = []
for field in fields:
row.append(res.get(field, None))
table_data.append(row)
if not table_data:
return
print
click.echo_via_pager(tabulate(table_data, headers=fields))
print
def usedby(ctx, id, **kwargs):
resp = AssociationAPI(ecx_session=ctx.ecx_session).get_using_resources(ctx.restype, id)["resources"]
if ctx.json:
ctx.print_response(resp)
return
table_data = [(x["type"], x["resourceId"], x["name"]) for x in resp]
print
click.echo_via_pager(tabulate(table_data, headers=["Type", "ID", "Name"]))
print
def print_response(self, resp):
if not self.links:
remove_links(resp)
click.echo_via_pager(json.dumps(resp, indent=4))
def cli(issue, since):
""" View comments on an issue/PR """
output = show_comment(issue.data)
for comment in issue.comments.get(params={"since": since}):
output += '\n' + show_comment(comment)
# echo_via_pager adds a newline, so remove the "extra" newline at the end
click.echo_via_pager(output.rstrip('\r\n'))
def cli(issue, since):
""" View comments on an issue/PR """
output = show_comment(issue.data)
for comment in issue.comments.get(params={"since": since}):
output += '\n' + show_comment(comment)
# echo_via_pager adds a newline, so remove the "extra" newline at the end
click.echo_via_pager(output.rstrip('\r\n'))
def log(self):
history = self.get_history()
outputs = []
for i, record in enumerate(history):
output = ''
record['timestamp'] = time.strftime(
'%a, %d %b %Y %H:%M:%S +0000',
time.strptime(record['updated'], '%Y%m%d-%H%M%S'))
checksum = (
' ({algorithm} {checksum})\nDate: {timestamp}'.format(
**record))
if self.versioned:
output = output + '\n' + (
click.style(
'version {}'.format(record['version']) + checksum,
fg='green'))
else:
output = output + '\n' + (
click.style(
'update {}'.format(len(history) - i) + checksum,
fg='green'))
for attr, val in sorted(record['user_config'].items()):
output = output + '\n' + (
'{:<10} {}'.format(attr+':', val))
if record.get('message', None) is not None:
wrapper = textwrap.TextWrapper(
initial_indent=' ',
subsequent_indent=' ',
width=66)
output = output + '\n\n'
output = output + '\n'.join(
wrapper.wrap(str(record['message'])))
outputs.append(output)
click.echo_via_pager('\n\n'.join(reversed(outputs)) + '\n')
def emit(self, record):
""" Echos the record.msg string by using click.echo()
The record will have attributes set to it when a user logs a message
with any kwargs given. This function looks for any attributes that
are in ECHO_KWARGS. If record has any kwargs attributes, the record will
be echoed with the given kwargs. This makes click logging with a logger very easy.
A user can add {"pager": True} as an extra param to any of the log commands to print
the content to a pager.
Args:
record (logging.LogRecord): record which gets echoed with click.echo()
"""
try:
# first format the record which adds the click style
formatted_record = self.format(record)
# user wants to use a pager to show message
if hasattr(record, "pager") and record.pager:
# save the original eviron dict
original_env = dict(os.environ)
# Force system to use pager and add default prompt at bottom left of the screen
os.environ['PAGER'] = "less"
os.environ['LESS'] = LESS_ENV
try:
click.echo_via_pager(formatted_record.msg,
color=record.color if hasattr(record, "color") else None)
# being paranoid here because we do NOT want to mess with people's environment
except:
os.environ.clear()
os.environ.update(original_env)
raise
else:
os.environ.clear()
os.environ.update(original_env)
else:
# Sets the default kwargs
kwargs = dict()
# if user added a known kwarg, change the defaults
for kwarg_name in self.ECHO_KWARGS:
if hasattr(record, kwarg_name):
kwargs[kwarg_name] = getattr(record, kwarg_name)
# echo to console
click.echo(formatted_record.msg, **kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def timeline(ctx, pager, limit, twtfile, sorting, timeout, porcelain, source, cache, force_update):
"""Retrieve your personal timeline."""
if source:
source_obj = ctx.obj["conf"].get_source_by_nick(source)
if not source_obj:
logger.debug("Not following {0}, trying as URL".format(source))
source_obj = Source(source, source)
sources = [source_obj]
else:
sources = ctx.obj["conf"].following
tweets = []
if cache:
try:
with Cache.discover(update_interval=ctx.obj["conf"].timeline_update_interval) as cache:
force_update = force_update or not cache.is_valid
if force_update:
tweets = get_remote_tweets(sources, limit, timeout, cache)
else:
logger.debug("Multiple calls to 'timeline' within {0} seconds. Skipping update".format(
cache.update_interval))
# Behold, almighty list comprehensions! (I might have gone overboard here…)
tweets = list(chain.from_iterable([cache.get_tweets(source.url) for source in sources]))
except OSError as e:
logger.debug(e)
tweets = get_remote_tweets(sources, limit, timeout)
else:
tweets = get_remote_tweets(sources, limit, timeout)
if twtfile and not source:
source = Source(ctx.obj["conf"].nick, ctx.obj["conf"].twturl, file=twtfile)
tweets.extend(get_local_tweets(source, limit))
if not tweets:
return
tweets = sort_and_truncate_tweets(tweets, sorting, limit)
if pager:
click.echo_via_pager(style_timeline(tweets, porcelain))
else:
click.echo(style_timeline(tweets, porcelain))