def format(self, record):
"""Apply little arrow and colors to the record.
Arrow and colors are only applied to sphinxcontrib.versioning log statements.
:param logging.LogRecord record: The log record object to log.
"""
formatted = super(ColorFormatter, self).format(record)
if self.verbose or not record.name.startswith(self.SPECIAL_SCOPE):
return formatted
# Arrow.
formatted = '=> ' + formatted
# Colors.
if not self.colors:
return formatted
if record.levelno >= logging.ERROR:
formatted = str(colorclass.Color.red(formatted))
elif record.levelno >= logging.WARNING:
formatted = str(colorclass.Color.yellow(formatted))
else:
formatted = str(colorclass.Color.cyan(formatted))
return formatted
python类LogRecord()的实例源码
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
"""
Create a LogRecord.
:param name: The name of the logger that produced the log record.
:param level: The logging level (severity) associated with the logging record.
:param fn: The name of the file (if known) where the log entry was created.
:param lno: The line number (if known) in the file where the log entry was created.
:param msg: The log message (or message template).
:param args: Ordinal message format arguments (if any).
:param exc_info: Exception information to be included in the log entry.
:param func: The function (if known) where the log entry was created.
:param extra: Extra information (if any) to add to the log record.
:param sinfo: Stack trace information (if known) for the log entry.
"""
# Do we have named format arguments?
if extra and 'log_props' in extra:
return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])
return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
"""
Create a `LogRecord`.
:param name: The name of the logger that produced the log record.
:param level: The logging level (severity) associated with the logging record.
:param fn: The name of the file (if known) where the log entry was created.
:param lno: The line number (if known) in the file where the log entry was created.
:param msg: The log message (or message template).
:param args: Ordinal message format arguments (if any).
:param exc_info: Exception information to be included in the log entry.
:param func: The function (if known) where the log entry was created.
:param extra: Extra information (if any) to add to the log record.
:param sinfo: Stack trace information (if known) for the log entry.
"""
# Do we have named format arguments?
if extra and 'log_props' in extra:
return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])
return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def _get_local_timestamp(record):
"""
Get the record's UTC timestamp as an ISO-formatted date / time string.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: The ISO-formatted date / time string.
:rtype: str
"""
timestamp = datetime.fromtimestamp(
timestamp=record.created,
tz=tzlocal()
)
return timestamp.isoformat(sep=' ')
def __is_kms_encrypt_request(self, record): # pylint: disable=no-self-use
# type: (logging.LogRecord) -> bool
"""Determine if a record contains a kms:Encrypt request.
:param record: Logging record to filter
:type record: logging.LogRecord
:rtype: bool
"""
try:
return all((
record.name == 'botocore.endpoint',
record.msg.startswith('Making request'),
cast(tuple, record.args)[-1]['headers']['X-Amz-Target'] == 'TrentService.Encrypt'
))
except Exception: # pylint: disable=broad-except
return False
def __is_kms_response_with_plaintext(self, record): # pylint: disable=no-self-use
# type: (logging.LogRecord) -> bool
"""Determine if a record contains a KMS response with plaintext.
:param record: Logging record to filter
:type record: logging.LogRecord
:rtype: bool
"""
try:
return all((
record.name == 'botocore.parsers',
record.msg.startswith('Response body:'),
b'KeyId' in cast(tuple, record.args)[0],
b'Plaintext' in cast(tuple, record.args)[0]
))
except Exception: # pylint: disable=broad-except
return False
def __init__(self, name, level, fn, lno, msg, args, exc_info, func, extra):
logging.LogRecord.__init__(self, name, level, fn, lno, msg, args, exc_info, func)
log_details = get_log_details()
log_details.update(extra or {})
for k in log_details.iterkeys():
setattr(self, k, log_details[k])
logger_fields = "levelname", "levelno", "process", "thread", "name", \
"filename", "module", "funcName", "lineno"
for f in logger_fields:
log_details["logger"][f] = getattr(self, f, None)
try:
correlation_id = request.correlation_id
except Exception:
correlation_id = None
log_details["logger"]["correlation_id"] = correlation_id
log_details["logger"]["created"] = datetime.datetime.utcnow().isoformat() + "Z"
for k in log_details.iterkeys():
setattr(self, k, log_details[k])
def test_custom_handler(self, mocker):
handler = DummyHandler()
mock = mocker.MagicMock()
handler.emit = mock
logger = make_logger()
logger.handlers = [handler]
disable(NOTSET)
logger.debug('test')
assert mock.call_count == 1
emit_call = mock.mock_calls[0]
name, args, kwargs = emit_call
assert name == ''
log_record = args[0]
assert isinstance(log_record, LogRecord)
assert log_record.msg == 'test'
assert log_record.levelname == 'DEBUG'
del logger
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(ColoredStreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
def test_pretty_xml_handler(self):
# Test that a normal, non-XML log record is passed through unchanged
stream = io.BytesIO() if PY2 else io.StringIO()
stream.isatty = lambda: True
h = PrettyXmlHandler(stream=stream)
self.assertTrue(h.is_tty())
r = logging.LogRecord(name='baz', level=logging.INFO, pathname='/foo/bar', lineno=1, msg='hello', args=(), exc_info=None)
h.emit(r)
h.stream.seek(0)
self.assertEqual(h.stream.read(), 'hello\n')
# Test formatting of an XML record. It should contain newlines and color codes.
stream = io.BytesIO() if PY2 else io.StringIO()
stream.isatty = lambda: True
h = PrettyXmlHandler(stream=stream)
r = logging.LogRecord(name='baz', level=logging.DEBUG, pathname='/foo/bar', lineno=1, msg='hello %(xml_foo)s', args=({'xml_foo': b'<?xml version="1.0" encoding="UTF-8"?><foo>bar</foo>'},), exc_info=None)
h.emit(r)
h.stream.seek(0)
self.assertEqual(
h.stream.read(),
"hello \x1b[36m<?xml version='1.0' encoding='utf-8'?>\x1b[39;49;00m\n\x1b[34;01m<foo\x1b[39;49;00m\x1b[34;01m>\x1b[39;49;00mbar\x1b[34;01m</foo>\x1b[39;49;00m\n\n"
)
def test_record_none_exc_info(django_elasticapm_client):
# sys.exc_info can return (None, None, None) if no exception is being
# handled anywhere on the stack. See:
# http://docs.python.org/library/sys.html#sys.exc_info
record = logging.LogRecord(
'foo',
logging.INFO,
pathname=None,
lineno=None,
msg='test',
args=(),
exc_info=(None, None, None),
)
handler = LoggingHandler()
handler.emit(record)
assert len(django_elasticapm_client.events) == 1
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert event['log']['param_message'] == 'test'
assert event['log']['logger_name'] == 'foo'
assert event['log']['level'] == 'info'
assert 'exception' not in event
def format(self, record):
""" Formats the record.msg string by using click.style()
The record will have attributes set to it when a user logs a message
with any kwargs given. This function looks for any attributes that
are in STYLES. If record has any sytle attributes, the record will
be styled with the given sytle. This makes click logging with a logger very easy.
Args:
record (logging.LogRecord): record which gets styled with click.style()
"""
# Sets the default kwargs
kwargs = dict()
# checks if any click args were passed along in the logger
for kwarg_name in self.STYLES:
if hasattr(record, kwarg_name):
kwargs[kwarg_name] = getattr(record, kwarg_name)
# styles the message of the record if a style was given
if kwargs:
record.msg = click.style(record.msg, **kwargs)
return record
def terminate(self):
if self.terminated:
return
self.terminated = True
def flushevents():
while True:
try:
event = self.event_queue.get(block=False)
except (Empty, IOError):
break
if isinstance(event, logging.LogRecord):
logger.handle(event)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.procserver.stop()
while self.procserver.is_alive():
flushevents()
self.procserver.join(0.1)
self.ui_channel.close()
self.event_queue.close()
self.event_queue.setexit()
# Wrap Queue to provide API which isn't server implementation specific
def test_connector_log_formatter_dict_format(self):
formatter = ConnectorLogFormatter()
message = {'message': 'fake_message', 'type': 'message_type'}
record = LogRecord('fake_name', 'DEBUG', None, None, message,
None, None)
record.reseller_name = 'fake_reseller_name'
# as return result of formatter.format is a string we will check the substring
actual_record = formatter.format(record)
assert 'fake_reseller_name' in actual_record
assert 'MESSAGE_TYPE' in actual_record
assert datetime.now().isoformat(' ')[:-7] in actual_record
record.msg = {
'text': 'fake_text'
}
actual_record = formatter.format(record)
assert 'fake_text' in actual_record
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def format(self, record):
"""
:param logging.LogRecord record:
"""
super(HtmlFormatter, self).format(record)
if record.funcName:
record.funcName = escape_html(str(record.funcName))
if record.name:
record.name = escape_html(str(record.name))
if record.msg:
record.msg = escape_html(record.getMessage())
if self.use_emoji:
print(record.name, record.levelno, record.levelname)
if record.levelno == logging.DEBUG:
print(record.levelno, record.levelname)
record.levelname += ' ' + EMOJI.WHITE_CIRCLE
elif record.levelno == logging.INFO:
print(record.levelno, record.levelname)
record.levelname += ' ' + EMOJI.BLUE_CIRCLE
else:
record.levelname += ' ' + EMOJI.RED_CIRCLE
return self.fmt % record.__dict__
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(StreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def test_json_formatter(self):
name = 'name'
line = 42
module = 'some_module'
func = 'some_function'
msg = {'content': 'sample log'}
log_record = LogRecord(
name, INFO, module, line, msg, None, None, func=func
)
formatter = JSONFormatter()
log_result = formatter.format(log_record)
result = json.loads(log_result)
# check some of the fields to ensure json formatted correctly
self.assertEqual(name, result['name'])
self.assertEqual(line, result['lineNumber'])
self.assertEqual(func, result['functionName'])
self.assertEqual(module, result['module'])
self.assertEqual('INFO', result['level'])
self.assertEqual(msg, result['message'])
def _two_masters_error_logged(
self,
log_records: List[logging.LogRecord],
) -> bool:
"""
Return whether a particular error is logged as a DEBUG message.
This is prone to being broken as it checks for a string in the DC/OS
repository.
Args:
log_records: Messages logged from the logger.
Returns:
Whether a particular error is logged as a DEBUG message.
"""
message = 'Must have 1, 3, 5, 7, or 9 masters'
encountered_error = False
for record in log_records:
if record.levelno == logging.DEBUG and message in str(record.msg):
encountered_error = True
return encountered_error
def format(self, record):
"""Formatting function
@param object record: :logging.LogRecord:
"""
entry = {
'name': record.name,
'timestamp': self.formatTime(record,
datefmt="%Y-%m-%d %H:%M:%S"),
'level': record.levelname
}
if hasattr(record, 'message'):
entry['message'] = record.message
else:
entry['message'] = super().format(record)
# add exception information if available
if record.exc_info is not None:
entry['exception'] = {
'message': traceback.format_exception(
*record.exc_info)[-1][:-1],
'traceback': traceback.format_exception(
*record.exc_info)[:-1]
}
return entry
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def logMessages(self, request):
"""Write a log message from the Swarm FE to the log.
Args:
request: A log message request.
Returns:
Void message.
"""
Level = api_backend.LogMessagesRequest.LogMessage.Level
log = logging.getLogger(__name__)
for message in request.messages:
level = message.level if message.level is not None else Level.info
# Create a log record and override the pathname and lineno. These
# messages come from the front end, so it's misleading to say that they
# come from api_backend_service.
record = logging.LogRecord(name=__name__, level=level.number, pathname='',
lineno='', msg=message.message, args=None,
exc_info=None)
log.handle(record)
return message_types.VoidMessage()
def format(self, record: logging.LogRecord) -> str:
"""Return formatted string of log record.
Args:
record: The log record.
Returns:
Formatted string of log record.
"""
try:
record.asctime
except AttributeError:
record.asctime = self.formatTime(record, self.datefmt)
try:
record.message
except AttributeError:
record.message = record.msg
s = '{} - {:>8} - {:>26} - {}'.format(record.asctime,
record.levelname,
record.name,
record.message)
return s
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def handle_new_task(self, task_name, record):
"""
Do everything needed when a task is starting
Params:
task_name (str): name of the task that is starting
record (logging.LogRecord): log record with all the info
Returns:
None
"""
record.msg = ColorFormatter.colored('default', START_TASK_MSG)
record.task = task_name
self.tasks[task_name] = Task(name=task_name, maxlen=self.buffer_size)
if self.should_show_by_depth():
self.pretty_emit(record, is_header=True)
def test_label_nonstring():
"""Logging things that aren't string is fine"""
result = {
"records": [
logging.LogRecord("root", "INFO", "", 0, msg, [], None)
for msg in (
"Proper message",
12,
{"a": "dict"},
list(),
1.0,
)
],
"error": None
}
model_ = model.Terminal()
model_.update_with_result(result)
for item in model_:
assert isinstance(item.data(model.Label), six.text_type), (
"\"%s\" wasn't a string!" % item.data(model.Label))
def emit(self, record):
"""
Emit a log record.
:param record: The LogRecord.
"""
self.log_queue.put(record, block=False)
def _build_event_data(record):
"""
Build an event data dictionary from the specified log record for submission to Seq.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: A dictionary containing event data representing the log record.
:rtype: dict
"""
if record.args:
# Standard (unnamed) format arguments (use 0-base index as property name).
log_props_shim = get_global_log_properties(record.name)
for (arg_index, arg) in enumerate(record.args or []):
log_props_shim[str(arg_index)] = arg
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": log_props_shim
}
elif isinstance(record, StructuredLogRecord):
# Named format arguments (and, therefore, log event properties).
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.msg,
"Properties": record.log_props
}
else:
# No format arguments; interpret message as-is.
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": _global_log_props
}
return event_data
def __redact_encrypt_request(self, record):
# type: (logging.LogRecord) -> None
"""Redact the ``Plaintext`` value from a kms:Encrypt request.
:param record: Logging record to filter
:type record: logging.LogRecord
"""
try:
parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[-1]['body']))
parsed_body['Plaintext'] = _REDACTED
cast(tuple, record.args)[-1]['body'] = json.dumps(parsed_body, sort_keys=True)
except Exception: # pylint: disable=broad-except
return