def filter_keyed_by_status(keys, values_with_status, context=None, level=logbook.ERROR):
"""Filter `values_with_status` (a `list` of `(bool, value)`); return dict with corresponding keys.
>>> dict(filter_keyed_by_status(['one', 'two'], [(True, 'foo'), (False, 'bar')]))
{'one': 'foo'}
"""
values = collections.OrderedDict()
for key, (status, value_or_traceback) in zip(keys, values_with_status):
if not status:
logger.log(level, "[{!s}] failed to retrieve data for '{!s}':\n{!s}", context, key,
value_or_traceback)
else:
values[key] = value_or_traceback
return values
python类ERROR的实例源码
def start_loop(cfg: Config, noop=False):
handlers = []
handlers.append(StreamHandler(sys.stdout, level=cfg.log_level))
logger = Logger("Heart")
logger.info("Initializing Oshino v{0}".format(get_version()))
logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit"
.format(cfg.interval))
if cfg.sentry_dsn:
try:
client = SentryClient(cfg.sentry_dsn)
handlers.append(SentryHandler(client,
level=logbook.ERROR,
bubble=True))
except InvalidDsn:
logger.warn("Invalid Sentry DSN '{0}' providen. Skipping"
.format(cfg.sentry_dsn))
setup = NestedSetup(handlers)
setup.push_application()
loop = create_loop()
try:
loop.run_until_complete(main_loop(cfg,
logger,
cfg.riemann.transport(noop),
forever,
loop=loop))
finally:
loop.close()
def get_color(self, record):
level = record.level
if level >= ERROR:
return 'red'
if level >= NOTICE:
return 'yellow'
if level == DEBUG:
return 'darkteal'
return 'lightgray'
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logging_method(log_level):
method_table = {
logbook.DEBUG: logger.debug,
logbook.INFO: logger.info,
logbook.WARNING: logger.warning,
logbook.ERROR: logger.error,
logbook.CRITICAL: logger.critical,
}
method = method_table.get(log_level)
if method is None:
raise ValueError("unknown log level: {}".format(log_level))
return method
log_service.py 文件源码
项目:cookiecutter-pyramid-talk-python-starter
作者: mikeckennedy
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def filter_by_status(values_with_status, context=None, level=logbook.ERROR):
"""Filter a `list` of `(bool, value)` pairs on the `bool` value, warning on `False` values.
>>> filter_by_status([(True, 'foo'), (False, 'bar')])
['foo']
"""
values = list()
for (status, value_or_traceback) in values_with_status:
if not status:
logger.log(level, "[{!s}] failed to retrieve data:\n{!s}", context, value_or_traceback)
else:
values.append(value_or_traceback)
return values
def _get_device_details(self, devices_response):
# logger.debug("bitfit devices:\n{!s}", json.dumps(devices_response, indent=2))
deferreds = [self._get_device_by_id(device['id'])
for device in devices_response.get('items', [])]
deferred_list = defer.DeferredList(deferreds, consumeErrors=True)
# shouldn't fail since we're working off bitfit's own data for the inputs
deferred_list.addCallback(stethoscope.api.utils.filter_by_status,
context=sys._getframe().f_code.co_name, level=logbook.ERROR)
return deferred_list
def get_devices_by_email(self, email):
deferred_list = defer.DeferredList([
threads.deferToThread(super(DeferredGoogleDataSource, self)._get_mobile_devices_by_email,
email),
threads.deferToThread(super(DeferredGoogleDataSource, self)._get_chromeos_devices_by_email,
email),
], consumeErrors=True)
deferred_list.addCallback(stethoscope.api.utils.filter_by_status,
context=sys._getframe().f_code.co_name, level=logbook.ERROR)
deferred_list.addCallback(chain.from_iterable)
deferred_list.addCallback(list)
return deferred_list
def _get_devices_by_id(self, device_ids):
deferred_list = defer.DeferredList([self._get_device_by_id(device_id) for device_id in
device_ids], consumeErrors=True)
# working off JAMF's own data, so shouldn't fail
deferred_list.addCallback(stethoscope.api.utils.filter_by_status,
context=sys._getframe().f_code.co_name, level=logbook.ERROR)
return deferred_list
def _get_logging_level(verbosity):
# noinspection PyPackageRequirements
import logbook
return {
1: logbook.CRITICAL,
2: logbook.ERROR,
3: logbook.WARNING,
4: logbook.NOTICE,
5: logbook.INFO,
6: logbook.DEBUG,
7: logbook.TRACE,
}[verbosity]
def main():
options = parse_option()
initialize_cli(options)
if is_execute_tc_command(options.tc_command_output):
check_tc_command_installation()
is_delete_all = options.is_delete_all
else:
subprocrunner.SubprocessRunner.default_is_dry_run = True
is_delete_all = True
set_logger(False)
try:
verify_network_interface(options.device)
except NetworkInterfaceNotFoundError as e:
logger.error("{:s}: {}".format(e.__class__.__name__, e))
return errno.EINVAL
subprocrunner.SubprocessRunner.clear_history()
tc = create_tc_obj(options)
if options.log_level == logbook.INFO:
subprocrunner.set_log_level(logbook.ERROR)
normalize_tc_value(tc)
return_code = 0
if is_delete_all:
return_code = tc.delete_all_tc()
else:
return_code = tc.delete_tc()
command_history = "\n".join(tc.get_command_history())
if options.tc_command_output == TcCommandOutput.STDOUT:
print(command_history)
return return_code
elif options.tc_command_output == TcCommandOutput.SCRIPT:
set_logger(True)
write_tc_script(
Tc.Command.TCDEL, command_history, filename_suffix=options.device)
return return_code
logger.debug("command history\n{}".format(command_history))
return return_code