def global_init(log_level_text, filename):
level = LogService.__get_logbook_logging_level(log_level_text)
if not filename:
logbook.StreamHandler(sys.stdout, level=level).push_application()
else:
logbook.TimedRotatingFileHandler(
filename, level=level,
date_format="%Y-%m-%d").push_application()
msg = 'Logging initialized, level: {}, mode: {}'.format(
log_level_text,
"stdout mode" if not filename else 'file mode: ' + filename
)
LogService.get_startup_log().notice(msg)
python类StreamHandler()的实例源码
log_service.py 文件源码
项目:cookiecutter-pyramid-talk-python-starter
作者: mikeckennedy
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def global_init(log_level_text, filename):
level = LogService.__get_logbook_logging_level(log_level_text)
if not filename:
logbook.StreamHandler(sys.stdout, level=level).push_application()
else:
logbook.TimedRotatingFileHandler(
filename, level=level,
date_format="%Y-%m-%d").push_application()
msg = 'Logging initialized, level: {}, mode: {}'.format(
log_level_text,
"stdout mode" if not filename else 'file mode: ' + filename
)
LogService.get_startup_log().notice(msg)
def global_init(log_level_text, filename):
level = LogService.__get_logbook_logging_level(log_level_text)
if not filename:
logbook.StreamHandler(sys.stdout, level=level).push_application()
else:
logbook.TimedRotatingFileHandler(
filename, level=level,
date_format="%Y-%m-%d").push_application()
msg = 'Logging initialized, level: {}, mode: {}'.format(
log_level_text,
"stdout mode" if not filename else 'file mode: ' + filename
)
LogService.get_startup_log().notice(msg)
def set_handler(args):
old_value = get_client_parameter(args.hostname, args.parameter)
try:
old = set_client_parameter(args.hostname, args.parameter, args.value)
except Exception as e:
sys.exit('Failed to set parameter: {}'.format(e))
if not old_value:
with logbook.StreamHandler(sys.stdout, bubble=True):
log.info('Set parameter {} for host {} to {}',
args.parameter, args.hostname, args.value)
elif old:
with logbook.StreamHandler(sys.stdout, bubble=True):
log.info('Changed parameter {} for host {} from {} to {}',
args.parameter, args.hostname, old, args.value)
else:
print('No changes.')
def unsnooze_handler(args):
if not (args.host or args.issue_name or args.all):
sys.exit('If you really want to unsnooze all issues for all hosts,\n'
'you need to specify --all.')
hostname = (None if not args.host else
args.host[0] if len(args.host) == 1 else
{'$in': args.host})
issue_name = (None if not args.issue_name else
args.issue_name[0] if len(args.issue_name) == 1
else {'$in': args.issue_name})
ids = unsnooze_issue(hostname, issue_name)
if not ids:
print('No matching issues.')
return
with logbook.StreamHandler(sys.stdout, bubble=True):
for doc in get_db().issues.find({'_id': {'$in': ids}}):
log.info('Unsnoozed {} {} at {}', doc['hostname'], doc['name'],
doc['unsnoozed_at'])
def close_handler(args):
if not (args.host or args.issue_name or args.all):
sys.exit('If you really want to close all issues for all hosts,\n'
'you need to specify --all.')
hostname = (None if not args.host else
args.host[0] if len(args.host) == 1 else
{'$in': args.host})
issue_name = (None if not args.issue_name else
args.issue_name[0] if len(args.issue_name) == 1
else {'$in': args.issue_name})
docs = close_issue(hostname, issue_name)
if not docs:
print('No matching issues.')
return
with logbook.StreamHandler(sys.stdout, bubble=True):
for doc in docs:
log.info('Manually closed {} issue for {}', doc['name'],
doc['hostname'])
def start_loop(cfg: Config, noop=False):
handlers = []
handlers.append(StreamHandler(sys.stdout, level=cfg.log_level))
logger = Logger("Heart")
logger.info("Initializing Oshino v{0}".format(get_version()))
logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit"
.format(cfg.interval))
if cfg.sentry_dsn:
try:
client = SentryClient(cfg.sentry_dsn)
handlers.append(SentryHandler(client,
level=logbook.ERROR,
bubble=True))
except InvalidDsn:
logger.warn("Invalid Sentry DSN '{0}' providen. Skipping"
.format(cfg.sentry_dsn))
setup = NestedSetup(handlers)
setup.push_application()
loop = create_loop()
try:
loop.run_until_complete(main_loop(cfg,
logger,
cfg.riemann.transport(noop),
forever,
loop=loop))
finally:
loop.close()
def debug():
logbook.StreamHandler(sys.stdout, level=logbook.DEBUG).push_application()
def get_logger(name, debug=True):
logbook.set_datetime_format('local')
handler = StreamHandler(sys.stdout) if debug else NullHandler()
handler.push_application()
return Logger(os.path.basename(name))
def unset_handler(args):
try:
old = set_client_parameter(args.hostname, args.parameter, None)
except Exception as e:
sys.exit('Failed to unset parameter: {}'.format(e))
if old:
with logbook.StreamHandler(sys.stdout, bubble=True):
log.info('Unset parameter {} for host {} (was {})', args.parameter,
args.hostname, old)
else:
print('No changes.')
def snooze_handler(args):
if not (args.host or args.issue_name or args.all):
sys.exit('If you really want to snooze all issues for all hosts,\n'
'you need to specify --all.')
if not (args.days or args.hours):
args.days = 1
if args.days:
then = now + datetime.timedelta(days=args.days)
else:
then = now + datetime.timedelta(hours=args.hours)
hostname = (None if not args.host else
args.host[0] if len(args.host) == 1 else
{'$in': args.host})
issue_name = (None if not args.issue_name else
args.issue_name[0] if len(args.issue_name) == 1
else {'$in': args.issue_name})
ids = snooze_issue(hostname, issue_name, then)
if not ids:
print('No matching issues.')
return
with logbook.StreamHandler(sys.stdout, bubble=True):
for doc in get_db().issues.find({'_id': {'$in': ids}}):
log.info('Snoozed {} {} until {}', doc['hostname'], doc['name'],
then)
def suspend_handler(args):
matches = suspend_host(args.host)
if not matches:
print('No matching, unsuspended hosts.')
return
with logbook.StreamHandler(sys.stdout, bubble=True):
for host in matches:
log.info('Suspended {}', host)
def open_handler(args):
with logbook.StreamHandler(sys.stdout, bubble=True):
for host in args.host:
for issue in args.issue_name:
if open_issue(host, issue):
log.info('Manually opened {} issue for {}', issue, host)
else:
print('Open {} issue for {} already exists.'.format(
issue, host))
def run(self):
log = Logger("GAF Bot")
log.handlers.append(StreamHandler(sys.stdout, bubble=True))
log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w"))
self.logger = log
self.logger.notice("Logging started")
self.logger.notice("Bot process started")
with open("bot/config/defaults/default.guildconfig.json") as f:
self.default_guild_config = json.load(f)
self.logger.debug("Loaded default guild config")
self.logger.debug("Connecting to DB")
self.db_conn = sqlite3.connect("bot/config/guild_configs.db")
self.logger.notice("DB Connection Established")
self.db_cursor = self.db_conn.cursor()
self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'")
exists = self.db_cursor.fetchone()
if not exists[0]:
self.logger.error("No table found in DB! Creating new one now")
self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''')
self.logger.debug("Table created")
self.load_extension("bot.modules.core")
self.logger.notice("Loaded core module")
self.logger.notice("Loading other modules")
# This bar and the time.sleep() stuff is entirely useless
# Like completely
# Don't do this
# It just looks cool and that makes me happy but really this is terrible
# and a complete waste of time
time.sleep(0.5)
for cog in tqdm.tqdm(self.config["modules"].keys(),
desc="Loading modules"
):
self.load_extension(f"bot.modules.{cog.lower()}")
time.sleep(0.2)
time.sleep(0.5)
self.logger.debug("Completed loading modules")
self.logger.notice("Logging into Discord")
super().run(self.config["token"], reconnect=True)