def __get__(self, instance, owner):
"""This method is called from class.
Args:
owner(object): class instance
Returns:
logging.LoggerAdapter: logger adaptor
Notes:
In case using logger for module level use get() method. __get__() won't be called from module level.
"""
if self.for_exception:
caller_frame = inspect.stack()[2]
module_name = inspect.getmodulename(caller_frame[1])
func_name = caller_frame[3]
try:
class_name = caller_frame[0].f_locals["self"].__class__.__name__
except KeyError:
class_name = ""
_logger_adaptor = self._get_logger(module_name, class_name, func_name)
else:
_logger_adaptor = self._get_logger(owner.__module__, owner.__name__)
return _logger_adaptor
python类LoggerAdapter()的实例源码
def __get__(self, instance, owner):
"""This method is called from class.
Args:
owner (owner): class instance.
Returns:
logging.LoggerAdapter: logger adaptor.
Note:
In case using logger for module level use get() method. __get__() won't be called from module level.
"""
if self.for_exception:
caller_frame = inspect.stack()[2]
module_name = inspect.getmodulename(caller_frame[1])
func_name = caller_frame[3]
try:
class_name = caller_frame[0].f_locals["self"].__class__.__name__
except KeyError:
class_name = ""
_logger_adaptor = self._get_logger(module_name, class_name, func_name)
else:
_logger_adaptor = self._get_logger(owner.__module__, owner.__name__)
return _logger_adaptor
def setUp(self):
super(LoggerAdapterTest, self).setUp()
old_handler_list = logging._handlerList[:]
self.recording = RecordingHandler()
self.logger = logging.root
self.logger.addHandler(self.recording)
self.addCleanup(self.logger.removeHandler, self.recording)
self.addCleanup(self.recording.close)
def cleanup():
logging._handlerList[:] = old_handler_list
self.addCleanup(cleanup)
self.addCleanup(logging.shutdown)
self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
def setUp(self):
super(LoggerAdapterTest, self).setUp()
old_handler_list = logging._handlerList[:]
self.recording = RecordingHandler()
self.logger = logging.root
self.logger.addHandler(self.recording)
self.addCleanup(self.logger.removeHandler, self.recording)
self.addCleanup(self.recording.close)
def cleanup():
logging._handlerList[:] = old_handler_list
self.addCleanup(cleanup)
self.addCleanup(logging.shutdown)
self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
def __get__(self, obj, owner=None):
target = owner if obj is None else obj
if hasattr(target, self.log_attr):
return getattr(target, self.log_attr)
logger = logging.getLogger(self._logger_name(owner))
if target is obj:
if self.context_vars:
extra = dict((k, getattr(target, k, None))
for k in self.context_vars)
elif hasattr(target, '_log_context'):
extra = getattr(target, '_log_context')
else:
extra = _context_search(inspect.currentframe())
if extra:
logger = logging.LoggerAdapter(logger, extra)
setattr(target, self.log_attr, logger)
return logger
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def setUp(self):
super(LoggerAdapterTest, self).setUp()
old_handler_list = logging._handlerList[:]
self.recording = RecordingHandler()
self.logger = logging.root
self.logger.addHandler(self.recording)
self.addCleanup(self.logger.removeHandler, self.recording)
self.addCleanup(self.recording.close)
def cleanup():
logging._handlerList[:] = old_handler_list
self.addCleanup(cleanup)
self.addCleanup(logging.shutdown)
self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
def __init__(self, fileObject, size, offset=None, whitelist=()):
"""
Default constructor which loads the raw data from a file.
No sanity check is performed on the size or existence of the file.
"""
# Common to MBR, VBR and IPL
self._suspiciousBehaviour = []
self._signature = []
self._codeHash = None
# MBR-specific
self._partTable = []
self._diskSignature = None
# VBR-specific
self._oemId = None
self._whitelist = whitelist
self._sample = fileObject.name
self._offset = offset
self._logger = logging.LoggerAdapter(logging.getLogger(__file__),
{'objectid': self._sample, 'stage': self._type})
self._raw = fileObject.read(size)
self._parse()
def log_config(event, loglevel=None, botolevel=None):
if 'ResourceProperties' in event.keys():
if 'loglevel' in event['ResourceProperties'] and not loglevel:
loglevel = event['ResourceProperties']['loglevel']
if 'botolevel' in event['ResourceProperties'] and not botolevel:
loglevel = event['ResourceProperties']['botolevel']
if not loglevel:
loglevel = 'warning'
if not botolevel:
botolevel = 'error'
# Set log verbosity levels
loglevel = getattr(logging, loglevel.upper(), 20)
botolevel = getattr(logging, botolevel.upper(), 40)
mainlogger = logging.getLogger()
mainlogger.setLevel(loglevel)
logging.getLogger('boto3').setLevel(botolevel)
logging.getLogger('botocore').setLevel(botolevel)
# Set log message format
#logfmt = '[%(requestid)s][%(asctime)s][%(levelname)s] %(message)s \n'
#if len(mainlogger.handlers) == 0:
# mainlogger.addHandler(logging.StreamHandler())
#mainlogger.handlers[0].setFormatter(logging.Formatter(logfmt))
return logging.LoggerAdapter(mainlogger, {'requestid': event['RequestId']})
test_logger.py 文件源码
项目:cloudformation-validation-pipeline
作者: awslabs
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_config(self):
instance = Logger()
self.assertEqual(instance.config(request_id='request_id', original_job_id="original_job_id", job_id='job_id',
artifact_revision_id='artifact_revision_id', pipeline_execution_id='pipeline_execution_id',
pipeline_action='pipeline_action', stage_name='stage_name', pipeline_name='pipeline_name',
loglevel='loglevel', botolevel='botolevel'), None)
self.assertEqual(type(instance.log), logging.LoggerAdapter)
self.assertEqual(logging.getLogger('boto3').level, 40)
self.assertEqual(instance.log.logger.level, 20)
self.assertEqual(instance.request_id, 'request_id')
self.assertEqual(instance.original_job_id, 'original_job_id')
self.assertEqual(instance.job_id, 'job_id')
self.assertEqual(instance.pipeline_execution_id, 'pipeline_execution_id')
self.assertEqual(instance.artifact_revision_id, 'artifact_revision_id')
self.assertEqual(instance.pipeline_action, 'pipeline_action')
self.assertEqual(instance.stage_name, 'stage_name')
crhelper.py 文件源码
项目:aws-service-catalog-validation-pipeline
作者: awslabs
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def log_config(event, loglevel=None, botolevel=None):
if 'ResourceProperties' in event.keys():
if 'loglevel' in event['ResourceProperties'] and not loglevel:
loglevel = event['ResourceProperties']['loglevel']
if 'botolevel' in event['ResourceProperties'] and not botolevel:
loglevel = event['ResourceProperties']['botolevel']
if not loglevel:
loglevel = 'warning'
if not botolevel:
botolevel = 'error'
# Set log verbosity levels
loglevel = getattr(logging, loglevel.upper(), 20)
botolevel = getattr(logging, botolevel.upper(), 40)
mainlogger = logging.getLogger()
mainlogger.setLevel(loglevel)
logging.getLogger('boto3').setLevel(botolevel)
logging.getLogger('botocore').setLevel(botolevel)
# Set log message format
#logfmt = '[%(requestid)s][%(asctime)s][%(levelname)s] %(message)s \n'
#if len(mainlogger.handlers) == 0:
# mainlogger.addHandler(logging.StreamHandler())
#mainlogger.handlers[0].setFormatter(logging.Formatter(logfmt))
return logging.LoggerAdapter(mainlogger, {'requestid': event['RequestId']})
test_logger.py 文件源码
项目:aws-service-catalog-validation-pipeline
作者: awslabs
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_config(self):
instance = Logger()
self.assertEqual(instance.config(request_id='request_id', original_job_id="original_job_id", job_id='job_id',
artifact_revision_id='artifact_revision_id', pipeline_execution_id='pipeline_execution_id',
pipeline_action='pipeline_action', stage_name='stage_name', pipeline_name='pipeline_name',
loglevel='loglevel', botolevel='botolevel'), None)
self.assertEqual(type(instance.log), logging.LoggerAdapter)
self.assertEqual(logging.getLogger('boto3').level, 40)
self.assertEqual(instance.log.logger.level, 20)
self.assertEqual(instance.request_id, 'request_id')
self.assertEqual(instance.original_job_id, 'original_job_id')
self.assertEqual(instance.job_id, 'job_id')
self.assertEqual(instance.pipeline_execution_id, 'pipeline_execution_id')
self.assertEqual(instance.artifact_revision_id, 'artifact_revision_id')
self.assertEqual(instance.pipeline_action, 'pipeline_action')
self.assertEqual(instance.stage_name, 'stage_name')
def get_logger(logger_name):
'''If the logger is exist, then return it directly.'''
if logger_name not in LoggerManager.created_loggers:
LoggerManager.create_logger(logger_name)
logger = logging.getLogger(logger_name)
if LoggerManager.log_handler == HANDLER_SYSLOG and platform.system() == 'Linux':
logger = logging.LoggerAdapter(logger, {'logger_name': logger_name})
return logger
def __init__(self, logger, level=ERROR):
# type: (Union[Logger, LoggerAdapter], int) -> None
"""
:param logger: The logger that log messages will get sent to.
:param level: Level of the logged messages.
"""
super(LogHandler, self).__init__()
self.logger = logger
self.level = level
def __init__(self, *args, **kwargs):
logging.LoggerAdapter.__init__(self, *args, **kwargs)
def _get_logger(self, modulename, classname="", caller_func=""):
"""Configure and return loggerAdapter instance.
Args:
modulename(str): module name
classname(str): class name
caller_func(str): function name
Returns:
logging.LoggerAdapter: logger adaptor
"""
if classname:
classname = "{0}.".format(classname)
if self._logger is None or self._logger.name != modulename:
self._logger = logging.getLogger(modulename)
self._logger.setLevel(getattr(logging, self.log_level))
if self.log_stream:
self._log_stream_handler = logging.StreamHandler(sys.stdout)
self._log_stream_handler.setFormatter(self.log_formatter)
present_stream_handlers = [_h for _h in self._logger.handlers if isinstance(_h, logging.StreamHandler)]
if len(present_stream_handlers) == 0:
self._logger.addHandler(self._log_stream_handler)
if self.log_file:
self._log_file_handler = logging.FileHandler(self.log_file)
self._log_file_handler.setFormatter(self.log_formatter)
present_file_handlers = [_h for _h in self._logger.handlers if isinstance(_h, logging.FileHandler)]
if len(present_file_handlers) == 0:
self._logger.addHandler(self._log_file_handler)
if self.for_exception:
self._logger_adapter = logging.LoggerAdapter(self._logger, {'caller_module': modulename, 'caller_func': caller_func, 'caller_class': classname})
else:
self._logger_adapter = logging.LoggerAdapter(self._logger, {'classname': classname})
return self._logger_adapter
def get(self, modulename, classname=""):
"""Return logerAdapter instance for module level loging.
Args:
modulename(str): module name
classname(str): class name
Returns:
logging.LoggerAdapter: logger adaptor
"""
_logger_adaptor = self._get_logger(modulename, classname)
return _logger_adaptor
def module_logger(name="", clsname=""):
"""Return LoggerAdapter for module level logging.
"""
# return ClassLogger(introspection=False).get(name)
return ClassLogger().get(name, clsname)
def get(self, modulename, classname=""):
"""Return logerAdapter instance for module level logging.
Args:
modulename (str): module name.
classname (str): class name.
Returns:
logging.LoggerAdapter: logger adaptor.
"""
_logger_adaptor = self._get_logger(modulename, classname)
return _logger_adaptor
def module_logger(name="", clsname=""):
"""Return LoggerAdapter for module level logging.
"""
return ClassLogger().get(name, clsname)
def _get_logger(self, modulename, classname="", caller_func=""):
"""Configure and return loggerAdapter instance.
"""
if classname:
classname = "{0}.".format(classname)
if self._logger is None or self._logger.name != modulename:
self._logger = logging.getLogger(modulename)
self._logger.setLevel(getattr(logging, self.log_level))
if self.log_stream:
self._log_stream_handler = logging.StreamHandler(sys.stdout)
self._log_stream_handler.setFormatter(self.log_formatter)
present_stream_handlers = [_h for _h in self._logger.handlers if isinstance(_h, logging.StreamHandler)]
if len(present_stream_handlers) == 0:
self._logger.addHandler(self._log_stream_handler)
if self.log_file:
self._log_file_handler = logging.FileHandler(self.log_file)
self._log_file_handler.setFormatter(self.log_formatter)
present_file_handlers = [_h for _h in self._logger.handlers if isinstance(_h, logging.FileHandler)]
if len(present_file_handlers) == 0:
self._logger.addHandler(self._log_file_handler)
if self.for_exception:
self._logger_adapter = logging.LoggerAdapter(self._logger, {'caller_module': modulename, 'caller_func': caller_func, 'caller_class': classname})
else:
self._logger_adapter = logging.LoggerAdapter(self._logger, {'classname': classname})
return self._logger_adapter
def module_logger(name="", clsname=""):
"""Return LoggerAdapter for module level logging.
"""
# return ClassLogger(introspection=False).get(name)
return ClassLogger().get(name, clsname)
def warn(self, msg, *args, **kwargs):
"""
Delegate warn() to warning().
This is provided as a convenience method in Logger but it is apparently
missing from LoggerAdapter, see
https://hg.python.org/cpython/file/2.7/Lib/logging/__init__.py#l1181
"""
self.warning(msg, *args, **kwargs)
def _hydrate_logger(self):
self.logger = logging.LoggerAdapter(logging.getLogger('pr0n_crawler'), {
'site_name': self.site_name,
'videos_current_number': self.crawler_current_videos,
})
def logger(self):
if not self._logger:
self._logger = getLogger(self.name, self.version)
if six.PY3:
# In Python 3, the code fails because the 'manager' attribute
# cannot be found when using a LoggerAdapter as the
# underlying logger. Work around this issue.
self._logger.manager = self._logger.logger.manager
return self._logger