def create_logger(logger_type: str = LoggerTypes.MAIN, runid: float = 0, configs={}, to_console: bool = True, filename_args = '') -> 'Logger':
logger = logging.getLogger(logger_type)
# create file
path, filename = Logger.__create_path_and_filename_by_type(logger_type, runid, configs, filename_args)
if len(logger.handlers) == 0:
# create formatter
if logger_type == LoggerTypes.MAIN:
formatter = logging.Formatter('%(asctime)s - %(message)s')
else:
formatter = logging.Formatter('%(message)s')
fileHandler = logging.FileHandler(path + filename)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
# if to_console:
# consoleHandler = logging.StreamHandler()
# consoleHandler.setFormatter(formatter)
# logger.addHandler(consoleHandler)
logger.setLevel(logging.INFO)
# logger.errors = []
log_obj = Logger()
log_obj.logger = logger
log_obj.start_time = datetime.now()
log_obj.last_start_time = datetime.now()
log_obj.errors = [] # logger.errors
log_obj.to_console = to_console
if 'log_to_console' in configs:
log_obj.to_console = configs['log_to_console']
log_obj.filename = filename
log_obj.config = configs
return log_obj
python类Logger()的实例源码
def log_simple(self, msg: str) -> None:
self.logger.info(self.strip_formatting_tags(msg))
if self.to_console:
Logger.pprint(msg)
def log(self, descr: str, last_start_time: datetime = None, rowcount: int = -1, newline: bool = False, indent_level=0) -> None:
if not last_start_time:
last_start_time = self.last_start_time
newline_str = '' #type: str
if newline:
newline_str = '\r\n'
end_time = datetime.now()
global_time_descr = ''
if last_start_time:
global_elapsed_time = end_time - self.start_time
elapsed_time_since_last_log = end_time - last_start_time
global_time_descr = self.time_descr(global_elapsed_time)
if descr:
if indent_level >= 4:
descr = '-' + descr
for i in range(indent_level):
descr = ' ' + descr
for i in range(len(self.strip_formatting_tags(descr)), 60):
# uitvullen tot 50 positities
descr += ' '
# divmod(elapsedTime.total_seconds(), 60)
if descr and self.logger:
if global_time_descr:
if rowcount >= 0:
descr = '{0} <lightgray>executed on {1} rows in {2} ({3} since start) {4}</>'.format(descr, rowcount, self.time_descr(elapsed_time_since_last_log), global_time_descr, newline_str)
else:
descr = '{0} <lightgray>executed in {1} ({2} since start) {3}</>'.format(descr, self.time_descr(elapsed_time_since_last_log), global_time_descr, newline_str)
else:
descr = '{}{}'.format(descr, newline_str)
self.logger.info(self.strip_formatting_tags(descr))
if self.to_console:
Logger.pprint(descr)
self.last_start_time = end_time
def get_ytvideos(query, ilogger):
"""
Gets either a list of videos from a playlist or a single video, using the
first result of a YouTube search
Args:
query (str): The YouTube search query
ilogger (logging.logger): The logger to log API calls to
Returns:
queue (list): The items obtained from the YouTube search
"""
queue = []
# Search YouTube
search_result = ytdiscoveryapi.search().list(
q=query,
part="id,snippet",
maxResults=1,
type="video,playlist"
).execute()
if not search_result["items"]:
return []
# Get video/playlist title
title = search_result["items"][0]["snippet"]["title"]
ilogger.info("Queueing {}".format(title))
# Queue video if video
if search_result["items"][0]["id"]["kind"] == "youtube#video":
# Get ID of video
videoid = search_result["items"][0]["id"]["videoId"]
# Append video to queue
queue.append(["https://www.youtube.com/watch?v={}".format(videoid), title])
# Queue playlist if playlist
elif search_result["items"][0]["id"]["kind"] == "youtube#playlist":
queue = get_queue_from_playlist(search_result["items"][0]["id"]["playlistId"])
return queue
def make_instance_catalog(self, obsHistID, band, boundLength, outfile=None):
"""
Method to create instance catalogs.
Parameters
----------
obsHistID : int
obsHistID for the desired visit from the opsim db file.
band : str
Desired LSST filter to use, ugrizy.
boundLength : float
Radius in degrees of sky cone in which to produce objects.
outfile : str, optional
File name of the instance catalog to be produced. If None,
a default name will be generated, e.g.,
phosim_input_0000230_r_0.3deg.txt.
"""
if outfile is None:
outfile = 'phosim_input_%07i_%s_%.1fdeg.txt' % (obsHistID, band,
boundLength)
obs_md = self.gen.getObservationMetaData(obsHistID=obsHistID,
boundLength=boundLength)[0]
do_header = True
for objid in self.star_objs:
self.logger.info("processing %s", objid)
db_obj = CatalogDBObject.from_objid(objid, **self.db_config)
phosim_object = PhoSimCatalogPoint(db_obj, obs_metadata=obs_md)
if do_header:
with open(outfile, 'w') as file_obj:
phosim_object.write_header(file_obj)
do_header = False
phosim_object.write_catalog(outfile, write_mode='a',
write_header=False,
chunk_size=20000)
for objid in self.gal_objs:
self.logger.info("processing %s", objid)
db_obj = CatalogDBObject.from_objid(objid, **self.db_config)
phosim_object = PhoSimCatalogSersic2D(db_obj, obs_metadata=obs_md)
phosim_object.write_catalog(outfile, write_mode='a',
write_header=False,
chunk_size=20000)
def get_module_logger(modulename = 'experiment', loglevel = logging.INFO):
"""get a logging.logger instance with reasonable defaults
Create a new logger and configure its name, loglevel, formatter
and output stream handling.
1. initialize a logger with name from arg 'modulename'
2. set loglevel from arg 'loglevel'
3. configure matching streamhandler
4. set formatting swag
5. return the logger
"""
loglevels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warn': logging.WARNING}
if type(loglevel) is str:
try:
loglevel = loglevels[loglevel]
except:
loglevel = logging.INFO
if modulename.startswith('smp_graphs'):
modulename = '.'.join(modulename.split('.')[1:])
if len(modulename) > 20:
modulename = modulename[-20:]
# create logger
logger = logging.getLogger(modulename)
logger.setLevel(loglevel)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(loglevel)
# create formatter
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
formatter = logging.Formatter('%(levelname)8s: %(name)20s: %(message)s')
# formatter = logging.Formatter('{levelname:8}s: %(name)20s: %(message)s')
# formatter = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# suppress double log output
logger.propagate = False
return logger
# function composition
# https://mathieularose.com/function-composition-in-python/