def init_logger(self):
self._mylogger = colorlog.getLogger("sequanix")
"""self._fh = open(self._logger_output, "w")
self._handler = colorlog.StreamHandler(self._fh)
formatter = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s%(reset)s %(blue)s%(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
)
self._handler.setFormatter(formatter)
self._mylogger.addHandler(self._handler)
"""
python类getLogger()的实例源码
def create_default_logger(level=logging.DEBUG,
fname='/tmp/ethereumd-proxy.log'):
handler = logging.FileHandler(fname)
formatter = colorlog.ColoredFormatter(
"%(log_color)s[%(asctime)s %(levelname)s "
"%(name)s - %(module)s:%(funcName)s:%(lineno)d]"
"%(reset)s %(blue)s%(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
},
secondary_log_colors={},
style='%'
)
handler.setFormatter(formatter)
handler.setLevel(level)
logger = colorlog.getLogger()
logger.addHandler(handler)
logger.setLevel(level)
return logger
def cb_crawler_after_finish(self, queue):
"""Crawler callback (called after the crawler finished).
Args:
queue (obj): The current crawling queue.
"""
if queue.get_all(QueueItem.STATUS_CANCELLED):
colorlog.getLogger().warning("Detective scanner finished (but some requests were cancelled).")
else:
colorlog.getLogger().info("Detective scanner finished.")
if self.__vulnerable_items:
colorlog.getLogger().success("Found " + str(len(self.__vulnerable_items)) + " endpoint(s) with interesting information.")
colorlog.getLogger().success("Listing endpoint(s) with interesting information.")
for vulnerable_item in self.__vulnerable_items:
colorlog.getLogger().success(vulnerable_item.request.url)
else:
colorlog.getLogger().warning("Couldn't find any endpoints with interesting information.")
def cb_request_before_start(self, queue, queue_item):
"""Crawler callback (called before a request starts).
Args:
queue (:class:`nyawc.Queue`): The current crawling queue.
queue_item (:class:`nyawc.QueueItem`): The queue item that's about to start.
Returns:
str: A crawler action (either DO_SKIP_TO_NEXT, DO_STOP_CRAWLING or DO_CONTINUE_CRAWLING).
"""
colorlog.getLogger().info("Investigating " + queue_item.request.url)
if self.__vulnerable_items and self.__args.stop_if_vulnerable:
self.stopping = True
return CrawlerActions.DO_STOP_CRAWLING
if self.stopping:
return CrawlerActions.DO_STOP_CRAWLING
return CrawlerActions.DO_CONTINUE_CRAWLING
def set_level(self):
# Set the level of the logging system
pref = self.preferences_dialog.ui
level = pref.preferences_options_general_logging_value.currentText()
level = getattr(colorlog.logging.logging, level)
colorlog.getLogger().setLevel(level)
# ---------------------------------------------------------------
# More GUI / reading the snakefile (sequana or generic)
# ---------------------------------------------------------------
def sequana_debug_level(level="WARNING"):
"""A deubg level setter at top level of the library"""
assert level in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
logging_level = getattr(logger.logging.logging, level)
logger.getLogger().setLevel(logging_level)
def create_module_dbg_logger(module_name: str):
module_name_simple = module_name.split(".")[-1]
def module_dbg_logger(msg: str):
if ({module_name_simple, "*"} & set(config.CFG.glob["LOG_DBG_MODULES"])) and (config.CFG.glob["LOG_LEVEL"] == "debug"):
logger = getLogger()
logger.setLevel(logging.DEBUG)
debug(module_name_simple + ": " + msg)
logger.setLevel(logging.INFO)
return module_dbg_logger
def test_example():
"""Tests the usage example from the README"""
import colorlog
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s:%(name)s:%(message)s'))
logger = colorlog.getLogger('example')
logger.addHandler(handler)
def test_colorlog_basicConfig(test_logger):
colorlog.basicConfig()
test_logger(colorlog.getLogger())
def setup_logger():
"""Setup ColorLog to enable colored logging output."""
# Colored logging
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
"%(log_color)s[%(levelname)s] %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "white",
"SUCCESS": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white"
}
))
logger = colorlog.getLogger()
logger.addHandler(handler)
# Also show INFO logs
logger.setLevel(logging.INFO)
# Add SUCCESS logging
logging.SUCCESS = 25
logging.addLevelName(
logging.SUCCESS,
"SUCCESS"
)
setattr(
logger,
"success",
lambda message, *args: logger._log(logging.SUCCESS, message, args)
)
def __signal_handler(self, signum, frame):
"""On sigint (e.g. CTRL+C) stop the crawler.
Args:
signum (int): The signal number.
frame (obj): The current stack frame.
"""
if self.stopping:
return
self.stopping = True
colorlog.getLogger().warning("Received SIGINT, stopping the crawling threads safely. This could take up to 15 seconds (the thread timeout).")
def cb_crawler_before_start(self):
"""Called before the crawler starts crawling."""
colorlog.getLogger().info("Detective scanner started.")
def cb_request_on_error(self, queue_item, message):
"""Crawler callback (called when a request error occurs).
Args:
queue_item (:class:`nyawc.QueueItem`): The queue item that failed.
message (str): The error message.
"""
colorlog.getLogger().error(message)
def init(level=logging.INFO):
logger = colorlog.getLogger('elliptic.Mesh.Mesh')
logger.setLevel(level)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s:%(name)s:%(message)s'))
logger.addHandler(handler)
def setup_logging(self, fd, level): # pylint: disable=C0103
logger = logging.getLogger()
logger.handlers.clear()
logger.addHandler(logging.StreamHandler(fd))
logger.setLevel(max(logging.ERROR - (level * 10), 1))
try:
import colorlog
handler = logging.getLogger().handlers[0]
handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s' + self.LOGGING_FORMAT))
colorlog.getLogger().addHandler(handler)
except ImportError:
# color log is just optional feature
logging.getLogger().handlers[0].setFormatter(logging.Formatter(self.LOGGING_FORMAT))
def __init__(self, scope=None, proxies=None, browser=None, log_level='DEBUG'):
self._scope = (scope or Scope())
self.proxies = (proxies or {})
self.script = None
self.registered_names = set()
self.browser = browser
self._is_ready = False
self._exec_options = {}
self.register_defaults()
self.sync_scopes()
self.ready()
self._line = 0
self._col = 0
self._log_handler = colorlog.StreamHandler(stream=sys.stderr)
self._log_handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(message)s'))
self.log = colorlog.getLogger('friendscript')
self.log.propagate = False
self.log.setLevel(logging.getLevelName(log_level.upper()))
self.log.addHandler(self._log_handler)
def __init__(self, path: str, recursive=False, worker_count=cpu_count() * 2, verbose=0) -> None:
"""
Create an OCyara object that can scan the specified directory or file and store the results.
Arguments:
path -- File or directory to be processed
Keyword Arguments:
recursive -- Whether the specified path should be recursivly searched for images (default False)
worker_count -- The number of worker processes that should be spawned when
run() is executed (default available CPU cores * 2)
verbose -- An int() from 0-2 that sets the verbosity level.
0 is default, 1 is information and 2 is debug
"""
self.path = path
self.recursive = recursive
self.q = JoinableQueue()
self.results = {}
self.threads = worker_count
self.lock = Lock()
self.workers = []
if os.path.isdir(self.path):
if self.path[-1] is '/':
self.path += '*'
else:
self.path += '/*'
self.manager = Manager()
self.matchedfiles = None
self.total_items_to_queue = None
self.total_added_to_queue = None
self.queue_items_completed = None
self.tempdir = tempfile.TemporaryDirectory()
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s:%(name)s:%(message)s'))
self.logger = colorlog.getLogger('OCyara')
self.logger.addHandler(handler)
if verbose == 1:
self.logger.setLevel('INFO')
elif verbose > 1:
self.logger.setLevel('DEBUG')
else:
self.logger.setLevel('WARN')
def _process_image(self, yara_rule: str, save_context: bool) -> None:
"""
Perform OCR and yara rule matching as a worker.
process_image() is used by the run() method to create multiple worker processes for
parallel execution. process_image normally will not be called directly.
Arguments:
yara_rule -- File path pointing to a Yara rule file
"""
context = None
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s:%(name)s:%(message)s'))
# Creates a logger object for the individual workers that contains the PID as part of the message header
worker_logger = colorlog.getLogger('worker_'+str(os.getpid()))
worker_logger.addHandler(handler)
worker_logger.setLevel(self.logger.level)
worker_logger.info('PID {0} created to process queue'.format(str(os.getpid())))
while True:
try:
image, filepath = self.q.get(timeout=.25)
except Empty:
if self.total_added_to_queue[0] == self.total_items_to_queue[0]:
worker_logger.debug('Queue Empty PID %d exiting' % os.getpid())
return
else:
worker_logger.debug('Queue still loading')
continue
ocrtext = tesserocr.image_to_text(image)
try:
rules = yara.compile(yara_rule)
except yara.Error:
worker_logger.error('Invalid rule file/rule file not found: {0}'.format(yara_rule))
matches = rules.match(data=ocrtext)
# If there is a match, store the filename and the rule in a dictionary local to this process.
# Create an editable copy of the master manager dict.
with self.lock:
local_results_dict = self.matchedfiles[0]
if matches:
if save_context:
context = ocrtext
for x in matches:
try:
local_results_dict[filepath].append((x.rule, context))
except KeyError:
local_results_dict[filepath] = [(x.rule, context)]
self.matchedfiles[0] = local_results_dict
self.queue_items_completed[0] += 1
self.q.task_done()
def setup_logger(name="Logger"):
""" Sets up a logging.Logger with the name $name. If the colorlog module
is available, the logger will use colors, otherwise it will be in b/w.
The colorlog module is available at
https://github.com/borntyping/python-colorlog
but can also easily be installed with e.g. 'sudo pip3 colorlog' or similar
commands.
Arguments:
name: name of the logger
Returns:
Logger
"""
if colorlog:
_logger = colorlog.getLogger(name)
else:
_logger = logging.getLogger(name)
if _logger.handlers:
# the logger already has handlers attached to it, even though
# we didn't add it ==> logging.get_logger got us an existing
# logger ==> we don't need to do anything
return _logger
_logger.setLevel(logging.DEBUG)
if colorlog is not None:
sh = colorlog.StreamHandler()
log_colors = {'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red'}
formatter = colorlog.ColoredFormatter(
'%(log_color)s%(name)s:%(levelname)s:%(message)s',
log_colors=log_colors)
else:
# no colorlog available:
sh = logging.StreamHandler()
formatter = logging.Formatter('%(name)s:%(levelname)s:%(message)s')
sh.setFormatter(formatter)
sh.setLevel(logging.DEBUG)
_logger.addHandler(sh)
if colorlog is None:
_logger.debug("Module colorlog not available. Log will be b/w.")
return _logger