def pre_prepare_second_call(**kwargs):
"""
this function gets called prior to an issued prepare but gets
called 'after' the first pre_prepare() call identified above
If you return False here, you will skip the preparation entirely
"""
logger.info(
'DEBUG HOOK pre_prep()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
# Below is an example of how you can tag 1 function to 2 different
# kinds of hooks:
python类pformat()的实例源码
def take_action(self, parsed_args):
session = self.app.client.sessions.get(parsed_args.session_id)
if not session:
raise exceptions.ApiClientException('Session not found')
column = (
'Session ID',
'Description',
'Status',
'Jobs'
)
data = (
session.get('session_id'),
session.get('description'),
session.get('status'),
pprint.pformat(session.get('jobs'))
)
return column, data
def take_action(self, parsed_args):
session = self.app.client.sessions.get(parsed_args.session_id)
if not session:
raise exceptions.ApiClientException('Session not found')
column = (
'Session ID',
'Description',
'Status',
'Jobs'
)
data = (
session.get('session_id'),
session.get('description'),
session.get('status'),
pprint.pformat(session.get('jobs'))
)
return column, data
def _cached_roles(search):
"""
Return the cached roles in a convenient structure. Trust the cached
values from the master pillar since a downed minion will be absent
from any dynamic query. Also, do not worry about downed minions that
are outside of the search criteria.
"""
pillar_util = salt.utils.master.MasterPillarUtil(search, "compound",
use_cached_grains=True,
grains_fallback=False,
opts=__opts__)
cached = pillar_util.get_minion_pillar()
roles = {}
for minion in cached:
if 'roles' in cached[minion]:
for role in cached[minion]['roles']:
roles.setdefault(role, []).append(minion)
log.debug(pprint.pformat(roles))
return roles.keys()
def _run(cmd):
"""
NOTE: Taken from osd.py module.
"""
log.info(cmd)
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
proc.wait()
_stdout = proc.stdout.read().rstrip()
_stderr = proc.stdout.read().rstrip()
log.debug("return code: {}".format(proc.returncode))
log.debug(_stdout)
log.debug(_stderr)
log.debug(pprint.pformat(proc.stdout.read()))
log.debug(pprint.pformat(proc.stderr.read()))
# return proc.returncode, _stdout, _stderr
return proc.returncode, _stdout, _stderr
def __init__(self, _id, **kwargs):
"""
Initialize settings, connect to Ceph cluster
"""
self.osd_id = _id
self.settings = {
'conf': "/etc/ceph/ceph.conf",
'filename': '/var/run/ceph/osd.{}-weight'.format(id),
'timeout': 60,
'keyring': '/etc/ceph/ceph.client.admin.keyring',
'client': 'client.admin',
'delay': 6
}
self.settings.update(kwargs)
log.debug("settings: {}".format(pprint.pformat(self.settings)))
# self.cluster=rados.Rados(conffile=self.settings['conf'])
self.cluster = rados.Rados(conffile=self.settings['conf'],
conf=dict(keyring=self.settings['keyring']),
name=self.settings['client'])
try:
self.cluster.connect()
except Exception as error:
raise RuntimeError("connection error: {}".format(error))
def readlink(device, follow=True):
"""
Return the short name for a symlink device
"""
option = ''
if follow:
option = '-f'
cmd = "readlink {} {}".format(option, device)
log.info(cmd)
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
proc.wait()
result = proc.stdout.read().rstrip()
log.debug(pprint.pformat(result))
log.debug(pprint.pformat(proc.stderr.read()))
return result
# pylint: disable=too-many-instance-attributes
def __init__(self, device, **kwargs):
"""
Set attributes for an OSD
"""
self.device = readlink(device)
# top_level_identifiier
self.tli = self._set_tli()
self.capacity = self.set_capacity()
self.size = self.set_bytes()
self.small = self._set_small()
self.disk_format = self.set_format()
self.journal = self.set_journal()
self.journal_size = self.set_journal_size()
self.wal_size = self.set_wal_size()
self.wal = self.set_wal()
self.db_size = self.set_db_size()
# pylint: disable=invalid-name
self.db = self.set_db()
# default for encryption can be retrieved from the global pillar
self.encryption = self.set_encryption()
self.types = self.set_types()
log.debug("OSD config: \n{}".format(pprint.pformat(vars(self))))
def set_journal(self, default=False):
"""
Return the journal device, if defined
"""
if self._config_version() == OSDConfig.V1:
struct = self._convert_data_journals(__pillar__['storage']['data+journals'])
log.debug("struct: \n{}".format(pprint.pformat(struct)))
if self.device in struct:
return struct[self.device]
else:
log.info("No journal specified for {}".format(self.device))
if self._config_version() == OSDConfig.V2:
if (self.device in self.tli and
'journal' in self.tli[self.device]):
return self.tli[self.device]['journal']
else:
log.info("No journal specified for {}".format(self.device))
return default
# pylint: disable=no-self-use
def _fsck(device, _partition):
"""
Check filesystem on partition
Note: xfs_repair returns immediately on success, but takes 3m39s to fail
on some broken filesystems. Not good for automation.
"""
prefix = ''
if 'nvme' in device:
prefix = 'p'
# cmd = "/sbin/fsck -t xfs -n {}{}{}".format(device, prefix, partition)
cmd = "/usr/sbin/xfs_admin -u {}{}{}".format(device, prefix, _partition)
log.info(cmd)
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
proc.wait()
log.debug(pprint.pformat(proc.stdout.read()))
log.debug(pprint.pformat(proc.stderr.read()))
log.debug("xfs_admin: {}".format(proc.returncode))
return proc.returncode == 0
def link_internal(data, fields=('name', 'product', 'location', 'unit')):
"""Link internal exchanges by ``fields``. Creates ``input`` field in newly-linked exchanges."""
input_databases = get_input_databases(data)
get_tuple = lambda exc: tuple([exc[f] for f in fields])
products = {
get_tuple(reference_product(ds)): (ds['database'], ds['code'])
for ds in data
}
for ds in data:
for exc in ds['exchanges']:
if exc.get('input'):
continue
if exc['type'] == 'biosphere':
raise ValueError("Unlinked biosphere exchange:\n{}".format(pformat(exc)))
try:
exc['input'] = products[get_tuple(exc)]
except KeyError:
raise KeyError("Can't find linking activity for exchange:\n{}".format(pformat(exc)))
return data
def meld(got, expected):
if got == expected:
return
import inspect
call_frame = inspect.getouterframes(inspect.currentframe(), 2)
test_name = call_frame[1][3]
from pprint import pformat
import os
from os import path
os.makedirs(test_name, exist_ok=True)
got_fn = path.join(test_name, 'got')
expected_fn = path.join(test_name, 'expected')
with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f:
got_f.write(pformat(got))
expected_f.write(pformat(expected))
import subprocess
subprocess.run(['meld', got_fn, expected_fn])
def compare_with_expected_file(test, dirpath, results, basename=None):
results_str = pprint.pformat(results, width=120)
if basename:
results_fn = basename + '.results'
expected_fn = basename + '.expected'
else:
results_fn = 'results'
expected_fn = 'expected'
# save results in a file
with open(os.path.join(dirpath, results_fn), 'w') as f:
print(results_str, file=f)
# read expected from a file
with open(os.path.join(dirpath, expected_fn)) as f:
expected = f.read().rstrip()
test.assertMultiLineEqual(expected, results_str)
#
# capture a directory tree as a dict 'tree', where each key is a directory path
# and the value is a sorted list of filenames
#
def _compare_eq_iterable(left, right, verbose=False):
if not verbose:
return [u('Use -v to get the full diff')]
# dynamic import to speedup pytest
import difflib
try:
left_formatting = pprint.pformat(left).splitlines()
right_formatting = pprint.pformat(right).splitlines()
explanation = [u('Full diff:')]
except Exception:
# hack: PrettyPrinter.pformat() in python 2 fails when formatting items that can't be sorted(), ie, calling
# sorted() on a list would raise. See issue #718.
# As a workaround, the full diff is generated by using the repr() string of each item of each container.
left_formatting = sorted(repr(x) for x in left)
right_formatting = sorted(repr(x) for x in right)
explanation = [u('Full diff (fallback to calling repr on each item):')]
explanation.extend(line.strip() for line in difflib.ndiff(left_formatting, right_formatting))
return explanation
def on_open(self):
"""
Once the connection is made, start the query off and
start an event loop to wait for a signal to
stop. Results are yielded within receive().
"""
def event_loop():
logger.debug(pformat(self.query.request))
self.send(json.dumps(self.query.request))
while not self.event.is_set():
#print('Waiting around on the socket')
self.event.wait(self.gettimeout())
logger.debug('Event loop terminating.')
self.thread = threading.Thread(
target=event_loop)
self.thread.setDaemon(True)
self.thread.start()
def get_wings_on_edge(self, pos1, side1_name, side2_name):
wings = self.get_wings(pos1)
wings_to_keep = []
#log.info("get_wings_on_edge for pos1 %d, side1 %s, side2 %s, init_wings %s" % (pos1, side1_name, side2_name, pformat(wings)))
for (wing_pos1, wing_pos2) in wings:
wing_pos1_side = self.get_side_for_index(wing_pos1)
wing_pos2_side = self.get_side_for_index(wing_pos2)
#log.info("get_wings_on_edge wing_pos1 %d side %s, wing_pos2 %d side %s\n" %
# (wing_pos1, wing_pos1_side, wing_pos2, wing_pos2_side))
if ((wing_pos1_side.name == side1_name and wing_pos2_side.name == side2_name) or
(wing_pos2_side.name == side1_name and wing_pos1_side.name == side2_name)):
wings_to_keep.append((wing_pos1, wing_pos2))
#log.info("get_wings_on_edge keeping %s\n" % pformat(wings_to_keep))
return wings_to_keep
def __repr__(self):
return '{} {}'.format(self.value, pprint.pformat(self.context))
def context_repr(self):
if self.context:
return pprint.pformat(self.context)
return ''
def populate(self):
# This function will be used to populate the UI. Shocking. I know.
# First lets clear all the items that are in the list to start fresh
self.listWidget.clear()
# Then we ask our library to find everything again in case things changed
self.library.find()
# Now we iterate through the dictionary
# This is why I based our library on a dictionary, because it gives us all the nice tricks a dictionary has
for name, info in self.library.items():
# We create an item for the list widget and tell it to have our controller name as a label
item = QtWidgets.QListWidgetItem(name)
# We set its tooltip to be the info from the json
# The pprint.pformat will format our dictionary nicely
item.setToolTip(pprint.pformat(info))
# Finally we check if there's a screenshot available
screenshot = info.get('screenshot')
# If there is, then we will load it
if screenshot:
# So first we make an icon with the path to our screenshot
icon = QtGui.QIcon(screenshot)
# then we set the icon onto our item
item.setIcon(icon)
# Finally we add our item to the list
self.listWidget.addItem(item)
# This is a convenience function to display our UI
def pre_prepare(**kwargs):
"""
this function gets called prior to an issued prepare
If you return False here, you will skip the preparation entirely
"""
logger.info(
'DEBUG HOOK pre_prep()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
# Below is an example of how you can define a fuction name that differs
# from the actual hook you wnat to tie it to
def pre_and_pre_prepare_call(**kwargs):
"""
A demo of how you can hook tag a function twice
"""
logger.info(
'DEBUG HOOK pre_and_post_prep()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def post_prepare(**kwargs):
"""
this function gets called after to an issued prepare
"""
logger.info(
'DEBUG HOOK post_prep()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def pre_stage(**kwargs):
"""
this function gets called prior to an issued stage
If you return False here, you will skip the staging entirely
"""
logger.info(
'DEBUG HOOK pre_stage()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def post_staged_segment(**kwargs):
"""
this function gets called after a segment has been staged for posting
"""
logger.info(
'DEBUG HOOK post_staged_segment()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def post_staged_nzb(**kwargs):
"""
this function gets called after a NZB-File has been staged for saving
"""
logger.info(
'DEBUG HOOK post_staged_nzb()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def post_stage(session, **kwargs):
"""
this function gets called prior to an issued stage
"""
logger.info(
'DEBUG HOOK post_stage()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def pre_upload(**kwargs):
"""
this function gets called prior to an issued upload
If you return False here, you will skip the upload entirely
"""
logger.info(
'DEBUG HOOK pre_upload()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def upload_article(**kwargs):
"""
this function gets called prior to an actual article upload
If you return False from this function, you 'will' prevent
the article from being uploaded.
"""
logger.info(
'DEBUG HOOK upload_article()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def pre_verify(**kwargs):
"""
this function gets called prior to running a verification check
If you return False here, you will skip the verify entirely
"""
logger.info(
'DEBUG HOOK pre_verify()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)
def post_verify(**kwargs):
"""
this function gets called after running a verification check
"""
logger.info(
'DEBUG HOOK pre_upload()\n{kwargs}'.format(
kwargs=pformat(kwargs, indent=4, depth=2),
)
)