def add_file_handler(self, path, name, level=None):
levelname = logging.getLevelName(level) if level is not None \
else 'DEFAULT'
filename = '{path}/{name}.{level}.log'.format(
path=os.path.abspath(path), name=name,
level=levelname)
if filename not in self.file_handlers:
from logging.handlers import TimedRotatingFileHandler
file_handler = TimedRotatingFileHandler(filename, when="midnight",
backupCount=7)
self.file_handlers[filename] = file_handler
if level is not None:
file_handler.setLevel(level)
self.add_handler(file_handler)
python类getLevelName()的实例源码
def make_logging_level_names_consistent():
"""Rename the standard library's logging levels to match Twisted's.
Twisted's new logging system in `twisted.logger` that is.
"""
for level in list(logging._levelToName):
if level == logging.NOTSET:
# When the logging level is not known in Twisted it's rendered as
# a hyphen. This is not a common occurrence with `logging` but we
# cater for it anyway.
name = "-"
elif level == logging.WARNING:
# "Warning" is more consistent with the other level names than
# "warn", so there is a fault in Twisted here. However it's easier
# to change the `logging` module to match Twisted than vice-versa.
name = "warn"
else:
# Twisted's level names are all lower-case.
name = logging.getLevelName(level).lower()
# For a preexisting level this will _replace_ the name.
logging.addLevelName(level, name)
def fetch_url(cls, session, msites, platform_id, purpose):
"""Actual method to do fetch url action.
Parameters
----------
msites : list
a list of Site model class, contains info to build spiders.
platform_id : int
id of platform, bind fetched url with this id.
purpose : {'update', 'archive'}
indicate which url to fetch.
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.UrlPipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
for ms in msites:
for sm in build_spiders_iter(ms, purpose):
sm['kwargs']['session'] = session
sm['kwargs']['platform_id'] = platform_id
process.crawl(sm['cls'], *sm['args'], **sm['kwargs'])
process.start()
def fetch_html(cls, session, url_tuples):
"""Actual method to do fetch html action.
Parameters
----------
session : object
a SQLAlchemy session object.
url_tuples : list
a list of url tuple (id, raw, status_code).
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.HtmlPipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
logger.warning('Number of url to fetch html is: %s', len(url_tuples))
process.crawl(
HtmlSpider,
session=session,
url_tuples=url_tuples,
excluded_domains=cls.conf['crawl']['excluded_domains'])
process.start()
def parse_article(cls, session, url_tuples):
"""Actual method to do parse to article action.
Parameters
----------
session : object
a SQLAlchemy session object.
url_tuples : list
a list of url tuple (id, created_at, date_published,
canonical, site_id)
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.ArticlePipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
logger.info('Number of url to parse is: %s', len(url_tuples))
process.crawl(
ArticleParserSpider,
session=session,
url_tuples=url_tuples,
api_key=cls.conf['crawl']['article_parser']['webparser_api_key'],)
process.start()
def __init__(self, config):
super(self.__class__, self).__init__()
self.dirs = config.dirs
self.files = config.files
self.parsing_dirs = config.dirs is not None
self.config = config
log_level = logging.getLevelName(config.debug.upper())
log.setLevel(log_level)
# mininet version stores a SeismicStatistics object to hold stats extracted from results
self.stats = None
# This will be set when parsing files to determine the type of experimental
# results file we're working with. Can be either 'mininet' or 'networkx'
self.experiment_type = None
def __init__(self, virtapi, read_only=False):
super(IronicDriver, self).__init__(virtapi)
global ironic
if ironic is None:
ironic = importutils.import_module('ironicclient')
# NOTE(deva): work around a lack of symbols in the current version.
if not hasattr(ironic, 'exc'):
ironic.exc = importutils.import_module('ironicclient.exc')
if not hasattr(ironic, 'client'):
ironic.client = importutils.import_module(
'ironicclient.client')
self.firewall_driver = firewall.load_driver(
default='nova.virt.firewall.NoopFirewallDriver')
self.node_cache = {}
self.node_cache_time = 0
ironicclient_log_level = CONF.ironic.client_log_level
if ironicclient_log_level:
level = py_logging.getLevelName(ironicclient_log_level)
logger = py_logging.getLogger('ironicclient')
logger.setLevel(level)
self.ironicclient = client_wrapper.IronicClientWrapper()
def parse_expressions(self, expressions):
""" Parse a list of logger matching expressions of the form
<regex>=<log-level>. Place the compiled regex's and levels
in the expressions attribute. """
lines = expressions.split('\n')
for line in lines:
try:
# Use the right split so we can have '='s in the regex
regex, level = line.rsplit('=', 1)
pattern = re.compile(regex)
results = (pattern, logging.getLevelName(level.upper()))
self.logger.log(
TraceLogger.TRACE,
'Appending %s:%s to logger level expressions' % (
results[0], results[1]))
self.expressions.append(results)
except Exception, ex:
self.logger.\
error('Parser error in log configuration file: %s' % (
line))
self.logger.exception(ex)
def setlogdir(logdir):
'''set the log directory'''
# set log color
logging.addLevelName(logging.INFO, print_style('%s', fore='green') % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, print_style('%s', fore='red') % logging.getLevelName(logging.WARNING))
ldir = os.path.dirname(logdir)
writelog = os.path.join(ldir, 'log.log')
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=writelog,
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
def logging_level(self):
""" **Syntax:** logging_level=[CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET]
**Description:** Sets the threshold for the logger of this command invocation. Logging messages less severe than
`logging_level` will be ignored.
"""
return getLevelName(self._logger.getEffectiveLevel())
def logging_level(self):
""" **Syntax:** logging_level=[CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET]
**Description:** Sets the threshold for the logger of this command invocation. Logging messages less severe than
`logging_level` will be ignored.
"""
return getLevelName(self._logger.getEffectiveLevel())
def _build_event_data(record):
"""
Build an event data dictionary from the specified log record for submission to Seq.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: A dictionary containing event data representing the log record.
:rtype: dict
"""
if record.args:
# Standard (unnamed) format arguments (use 0-base index as property name).
log_props_shim = get_global_log_properties(record.name)
for (arg_index, arg) in enumerate(record.args or []):
log_props_shim[str(arg_index)] = arg
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": log_props_shim
}
elif isinstance(record, StructuredLogRecord):
# Named format arguments (and, therefore, log event properties).
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.msg,
"Properties": record.log_props
}
else:
# No format arguments; interpret message as-is.
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": _global_log_props
}
return event_data
def setup_logger(self, level=logging.INFO):
"""Configure global log settings"""
if isinstance(level, int):
self.level = logging.getLevelName(level)
self.logger = logging.getLogger()
self.logger.setLevel(self.level)
if not len(self.logger.handlers):
ch = logging.StreamHandler(stream=sys.stderr)
logformat = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(logformat)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
def get_log_level():
level = logging.getLevelName(os.environ.get('LOG_LEVEL', '').upper())
if not isinstance(level, int):
level = DEFAULT_LEVEL
return level
def logLevels():
return [logging.getLevelName(n) for n in xrange(0, logging.CRITICAL + 1, 10)]
def _log(self, level, msg, event=()):
event = events.Event(event).union({
"logtime": time.strftime("%Y-%m-%d %H:%M:%SZ", time.gmtime()),
"logmsg": msg,
"loglevel": logging.getLevelName(level)
})
return self._logger.log(level, msg, **{"extra": {"event": event}})
no_you_talk_to_the_hand.py 文件源码
项目:no-YOU-talk-to-the-hand
作者: flashashen
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_config(config='~/.nyttth/config.yml'):
global cfg
if not cfg:
cfgpath = os.path.expanduser(config)
log.debug('reading config from {}'.format(cfgpath))
cfg = dict()
if os.path.isfile(cfgpath):
with open(cfgpath, 'r') as stream:
cfg = yaml.load(stream)
else:
print 'config not found at {}. Create y/n?'.format(cfgpath)
if propmt_yn():
import errno
try:
os.makedirs(os.path.dirname(cfgpath))
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(cfgpath, 'w') as cfg_file:
cfg_file.write(SAMPLE_CONFIG)
print 'Sample configuration has been written to {}.\n You will need to edit '.format(cfgpath) + \
'this configuration with real values from your networking environment. Exiting.'
else:
print 'Exiting'
exit()
if 'log_level' in cfg:
# print('setting log level to {}'.format(cfg['log_level']))
log.setLevel(logging.getLevelName(cfg['log_level']))
cfg['basedir'] = os.path.dirname(cfgpath)
cfg['supervisor.conf'] = os.path.join(cfg['basedir'],'supervisord.conf')
return cfg
def __new__(cls, value):
if value in cls._level_instances:
return cls._level_instances[value]
instance = int.__new__(cls, value)
instance.name = logging.getLevelName(value)
cls._level_instances[value] = instance
return instance
def test_log_state():
lvl=logging.DEBUG
msg = '{0}: State Msg'.format(logging.getLevelName(lvl))
regex = '^{0}.*STATE - {1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_state(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def test_log_info():
lvl=logging.INFO
msg = 'Info Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_info(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)