def pre_clean(**kwargs):
"""
this function gets called prior to running a cleanup
If you return False here, you will skip the clean-up entirely
"""
logger.info(
'DEBUG HOOK pre_clean()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
python类pformat()的实例源码
def post_clean(**kwargs):
"""
this function gets called after running a cleanup
"""
logger.info(
'DEBUG HOOK pre_upload()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def configure_linebot_app(app):
@app.after_request
def commit_database(response):
db.commit()
return response
@app.route("/api/line_webhook", methods=["POST"])
def line_webhook():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
logger.debug(f'Incoming message:\n{pformat(body)}')
try:
line_webhook_handler.handle(body, signature)
except InvalidSignatureError:
logger.warning('Message with an invalid signature received')
abort(400)
except LineBotApiError as e:
logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}')
abort(500)
except Exception as e:
logger.error(f'Uncaught error: {e}')
abort(500)
return "OK"
def do_lookup(icao_identifier, date):
ln = LoggingNight(icao_identifier, date, try_cache=True)
if ln.in_zulu:
time_format = '%H%M Zulu'
else:
time_format = '%I:%M %p'
if dev_mode == "true":
result = dict(
airport=icao_identifier,
name=ln.name,
date=date.isoformat(),
sunset=ln.sun_set.strftime(time_format),
end_civil=ln.end_civil_twilight.strftime(time_format),
one_hour=ln.hour_after_sunset.strftime(time_format),
airport_debug=pprint.pformat(ln.airport, indent=4),
usno_debug=pprint.pformat(ln.usno, indent=4)
)
else:
result = dict(
airport=icao_identifier,
name=ln.name,
date=date.isoformat(),
sunset=ln.sun_set.strftime(time_format),
end_civil=ln.end_civil_twilight.strftime(time_format),
one_hour=ln.hour_after_sunset.strftime(time_format)
)
return result
def buildFacade(schema):
cls = type(schema.name, (Type,), dict(name=schema.name,
version=schema.version,
schema=schema))
source = """
class {name}Facade(Type):
name = '{name}'
version = {version}
schema = {schema}
""".format(name=schema.name,
version=schema.version,
schema=textwrap.indent(pprint.pformat(schema), " "))
return cls, source
def _format_unequal_keys(dicts):
return pformat([sorted(d.keys()) for d in dicts])
def assertDictEqual(self, d1, d2, msg=None):
self.assertIsInstance(d1, dict, 'First argument is not a dictionary')
self.assertIsInstance(d2, dict, 'Second argument is not a dictionary')
if d1 != d2:
standardMsg = '%s != %s' % (safe_repr(d1, True), safe_repr(d2, True))
diff = ('\n' + '\n'.join(difflib.ndiff(
pprint.pformat(d1).splitlines(),
pprint.pformat(d2).splitlines())))
standardMsg = self._truncateMessage(standardMsg, diff)
self.fail(self._formatMessage(msg, standardMsg))
def dump_option_dicts(self, header=None, commands=None, indent=""):
from pprint import pformat
if commands is None: # dump all command option dicts
commands = self.command_options.keys()
commands.sort()
if header is not None:
self.announce(indent + header)
indent = indent + " "
if not commands:
self.announce(indent + "no commands known yet")
return
for cmd_name in commands:
opt_dict = self.command_options.get(cmd_name)
if opt_dict is None:
self.announce(indent +
"no option dict for '%s' command" % cmd_name)
else:
self.announce(indent +
"option dict for '%s' command:" % cmd_name)
out = pformat(opt_dict)
for line in out.split('\n'):
self.announce(indent + " " + line)
# -- Config file finding/parsing methods ---------------------------
def print_processed_results(self, counts, failures):
for p, d in zip(self.possible_results, self.result_descriptions):
print('{}: {}'. format(d, counts[p]))
if failures:
print('Failures:')
print(pprint.pformat(failures))
def take_action(self, parsed_args):
job = self.app.client.jobs.get(parsed_args.job_id)
if not job:
raise exceptions.ApiClientException('Job not found')
column = (
'Job ID',
'Client ID',
'User ID',
'Session ID',
'Description',
'Actions',
'Start Date',
'End Date',
'Interval',
)
data = (
job.get('job_id'),
job.get('client_id'),
job.get('user_id'),
job.get('session_id', ''),
job.get('description'),
pprint.pformat(job.get('job_actions')),
job.get('job_schedule', {}).get('schedule_start_date', ''),
job.get('job_schedule', {}).get('schedule_end_date', ''),
job.get('job_schedule', {}).get('schedule_interval', ''),
)
return column, data
def take_action(self, parsed_args):
backup = self.app.client.backups.get(parsed_args.backup_uuid)
if not backup:
raise exceptions.ApiClientException('Backup not found')
column = (
'Backup ID',
'Metadata'
)
data = (
backup.get('backup_uuid'),
pprint.pformat(backup.get('backup_metadata'))
)
return column, data
def take_action(self, parsed_args):
job = self.app.client.jobs.get(parsed_args.job_id)
if not job:
raise exceptions.ApiClientException('Job not found')
column = (
'Job ID',
'Client ID',
'User ID',
'Session ID',
'Description',
'Actions',
'Start Date',
'End Date',
'Interval',
)
data = (
job.get('job_id'),
job.get('client_id'),
job.get('user_id'),
job.get('session_id', ''),
job.get('description'),
pprint.pformat(job.get('job_actions')),
job.get('job_schedule', {}).get('schedule_start_date', ''),
job.get('job_schedule', {}).get('schedule_end_date', ''),
job.get('job_schedule', {}).get('schedule_interval', ''),
)
return column, data
def create_app(self):
"""Send a POST to spinnaker to create a new application with class variables.
Raises:
AssertionError: Application creation failed.
"""
self.appinfo['accounts'] = self.get_accounts()
self.log.debug('Pipeline Config\n%s', pformat(self.pipeline_config))
self.log.debug('App info:\n%s', pformat(self.appinfo))
jsondata = self.retrieve_template()
wait_for_task(jsondata)
self.log.info("Successfully created %s application", self.appname)
return
def retrieve_template(self):
"""Sets the instance links with pipeline_configs and then renders template files
Returns:
jsondata: A json objects containing templates
"""
links = self.retrieve_instance_links()
self.log.debug('Links is \n%s', pformat(links))
self.pipeline_config['instance_links'].update(links)
jsondata = get_template(template_file='infrastructure/app_data.json.j2',
appinfo=self.appinfo, pipeline_config=self.pipeline_config)
self.log.debug('jsondata is %s', pformat(jsondata))
return jsondata
def create_elb(self):
"""Create or Update the ELB after rendering JSON data from configs.
Asserts that the ELB task was successful.
"""
json_data = self.make_elb_json()
LOG.debug('Block ELB JSON Data:\n%s', pformat(json_data))
wait_for_task(json_data)
self.add_listener_policy(json_data)
self.add_backend_policy(json_data)
self.configure_attributes(json_data)
def render_wrapper(self, region='us-east-1'):
"""Generate the base Pipeline wrapper.
This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.
Args:
region (str): AWS Region.
Returns:
dict: Rendered Pipeline wrapper.
"""
base = self.settings['pipeline']['base']
if self.base:
base = self.base
email = self.settings['pipeline']['notifications']['email']
slack = self.settings['pipeline']['notifications']['slack']
deploy_type = self.settings['pipeline']['type']
pipeline_id = self.compare_with_existing(region=region)
data = {
'app': {
'appname': self.app_name,
'base': base,
'deploy_type': deploy_type,
'environment': 'packaging',
'region': region,
'triggerjob': self.trigger_job,
'email': email,
'slack': slack,
},
'id': pipeline_id
}
self.log.debug('Wrapper app data:\n%s', pformat(data))
wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)
return json.loads(wrapper)
def render_wrapper(self, region='us-east-1'):
"""Generate the base Pipeline wrapper.
This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.
Args:
region (str): AWS Region.
Returns:
dict: Rendered Pipeline wrapper.
"""
base = self.settings['pipeline']['base']
if self.base:
base = self.base
email = self.settings['pipeline']['notifications']['email']
slack = self.settings['pipeline']['notifications']['slack']
deploy_type = self.settings['pipeline']['type']
pipeline_id = self.compare_with_existing(region=region)
data = {
'app': {
'appname': self.app_name,
'base': base,
'deploy_type': deploy_type,
'environment': 'packaging',
'region': region,
'triggerjob': self.trigger_job,
'email': email,
'slack': slack,
},
'id': pipeline_id
}
self.log.debug('Wrapper app data:\n%s', pformat(data))
wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)
return json.loads(wrapper)
def render_wrapper(self, region='us-east-1'):
"""Generate the base Pipeline wrapper.
This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.
Args:
region (str): AWS Region.
Returns:
dict: Rendered Pipeline wrapper.
"""
base = self.base or self.settings['pipeline']['base']
email = self.settings['pipeline']['notifications']['email']
slack = self.settings['pipeline']['notifications']['slack']
deploy_type = self.settings['pipeline']['type']
pipeline_id = self.compare_with_existing(region=region)
data = {
'app': {
'appname': self.app_name,
'base': base,
'deploy_type': deploy_type,
'environment': 'packaging',
'region': region,
'triggerjob': self.trigger_job,
'email': email,
'slack': slack,
},
'id': pipeline_id
}
self.log.debug('Wrapper app data:\n%s', pformat(data))
wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)
return json.loads(wrapper)
def debug(*objects):
flask.current_app.logger.debug('\n'.join(map(pprint.pformat, objects)))