def format(self, record):
if self.debug and self.color:
if record.levelno >= logging.CRITICAL:
color = TEXT_RED
elif record.levelno >= logging.ERROR:
color = TEXT_RED
elif record.levelno >= logging.WARNING:
color = TEXT_YELLOW
elif record.levelno >= logging.INFO:
color = TEXT_GREEN
elif record.levelno >= logging.DEBUG:
color = TEXT_CYAN
else:
color = TEXT_NORMAL
record.levelname = "\x1b[%sm%s\x1b[%sm" % (color, record.levelname, TEXT_NORMAL)
return logging.Formatter.format(self, record)
python类ERROR的实例源码
def update_logging_settings(self, file_path=None, level=None, format=None):
"""
Update global logging. If None is set to the arguments, it will keep the previous setting.
Args:
file_path (str): It is Initialized to 'log.log'.
level (str): It can be 'error', 'warning' or 'info'. It is Initialized to 'error'.
format (str): It is Initialized to '%(asctime)s %(levelname)s %(message)s'.
"""
LOGGING_STRING_MAP = {'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR}
if file_path is not None:
self._logger_config['file_path'] = self._get_abs_path(file_path)
if level is not None:
self._logger_config['level'] = level
if format is not None:
self._logger_config['format'] = format
logger = logging.getLogger(Configuration.LOGGER_NAME)
log_file = logging.FileHandler(self._logger_config['file_path'])
logger.addHandler(log_file)
log_file.setFormatter(logging.Formatter(self._logger_config['format']))
logger.setLevel(LOGGING_STRING_MAP[self._logger_config['level']])
self._logger = logger
def get_logging_level(log_level):
logging_level = logging.INFO
if log_level == 'DEBUG':
logging_level = logging.DEBUG
elif log_level == 'INFO':
logging_level = logging.INFO
elif log_level == 'WARNING':
logging_level = logging.WARNING
elif log_level == 'ERROR':
logging_level = logging.ERROR
elif log_level == 'CRITICAL':
logging_level = logging.CRITICAL
else:
print('Unknown or unset logging level. Using INFO')
return logging_level
def _logWriter(self,level,message,exception=None):
self._logger.setLevel(level)
self._fh.setLevel(level)
self._ch.setLevel(level)
if(exception!=None):
exFormatted = self._formatException(exception)
msg = "%s%s" % (message,exFormatted)
if(level==logging.DEBUG):
logging.debug(msg)
elif(level==logging.INFO):
logging.info(msg)
elif(level==logging.WARN):
logging.warn(msg)
elif(level==logging.FATAL):
logging.fatal(msg)
if(level==logging.ERROR):
logging.error(msg)
def send_message(self, email_content):
# ???????
now = datetime.datetime.now()
header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \
str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']'
msg = MIMEText(email_content, 'plain', 'utf-8')
msg['from'] = self.smtp_from_addr
msg['to'] = self.smtp_to_addr
msg['Subject'] = Header(header, 'utf-8').encode()
# ??
try:
smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port)
smtp_server.login(self.smtp_from_addr, self.smtp_server_password)
smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string())
smtp_server.quit()
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.error("??????")
log.exception(e)
def check_and_restart(self):
for process_thread in self.processor_list:
if process_thread.thread_status == 'error':
thread_id = process_thread.thread_id
self.processor_list.remove(process_thread)
del process_thread
new_thread = ProcessThread(thread_id, self.redis_connection, self.token_filter,
self.response_buffer, self.is_parser_following_list,
self.is_parser_follower_list, self.is_parser_follow_relation)
self.processor_list.append(new_thread)
new_thread.start()
if log.isEnabledFor(logging.ERROR):
log.error('???????[' + thread_id + ']????')
# URL ??
def run(self):
try:
while True:
# ????????????
while is_scanning:
time.sleep(3)
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() > 0:
unchecked_proxy = unchecked_proxy_list.get()
is_available = self.dataValidateModule.validate_proxy_ip(unchecked_proxy)
if is_available is True:
proxy_pool.put(unchecked_proxy)
# print(unchecked_proxy)
time.sleep(1)
else:
time.sleep(5)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# ????????????????????
def run(self):
try:
while True:
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() < PROXY_POOL_SIZE:
self.fetch_and_parse_proxy()
elif proxy_pool.qsize() == PROXY_POOL_SIZE:
if log.isEnabledFor(logging.DEBUG):
log.debug('?????')
self.scan_proxy_pool()
time.sleep(PROXY_POOL_SCAN_INTERVAL)
else:
time.sleep(60)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# ?????????
def init_app(cls, app):
Config.init_app(app)
# email errors to the administrator.
credentials, secure= None, None
if getattr(cls, 'MAIL_USERNAME', None) is not None:
credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
if getattr(cls, 'MAIL_USE_TLS', None):
secure = ()
mail_handler = SMTPHandler(
mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT),
fromaddr=cls.MAIL_SENDER,
toaddrs=[cls.FLASK_ADMIN],
subject=cls.MAIL_SUBJECT_PREFIX + ' Application Error',
credentials=credentials,
secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
def test_poly_bands(self):
print('\ntesting test_poly_bands...')
mass = 0.25
self.model_params['poly_bands'] = [[[[0.0, 0.0, 0.0], [0.0, mass]]]]
amset = AMSET(calc_dir=self.GaAs_path,material_params=self.GaAs_params,
model_params=self.model_params,
performance_params=self.performance_params,
dopings=[-2e15], temperatures=[300], k_integration=True,
e_integration=True, fermi_type='k',
loglevel=logging.ERROR)
amset.run(self.GaAs_cube, kgrid_tp='coarse', write_outputs=False)
egrid = amset.egrid
diff = abs(np.array(amset.mobility['n']['ACD'][-2e15][300]) - \
np.array(egrid['n']['mobility']['SPB_ACD'][-2e15][300]))
avg = (amset.mobility['n']['ACD'][-2e15][300] + \
egrid['n']['mobility']['SPB_ACD'][-2e15][300]) / 2
self.assertTrue((diff / avg <= 0.01).all())
def __init__(self, log_level=ERROR):
"""Initialize the deployment environment."""
super(OpenStackAmuletUtils, self).__init__(log_level)
def __init__(self, log_level=logging.ERROR):
self.log = self.get_logger(level=log_level)
self.ubuntu_releases = self.get_ubuntu_releases()
def set_verbosity(verbose):
"""
A simple function one can use to set the verbosity of
the app.
"""
# Default
logging.getLogger(SQLALCHEMY_LOGGER).setLevel(logging.ERROR)
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_LOGGER).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_CLI).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.ERROR)
# Handle Verbosity
if verbose > 0:
logging.getLogger(NEWSREAP_CLI).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.INFO)
if verbose > 1:
logging.getLogger(NEWSREAP_CLI).setLevel(logging.DEBUG)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.DEBUG)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.DEBUG)
if verbose > 2:
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.INFO)
if verbose > 3:
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.DEBUG)
if verbose > 4:
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.DEBUG)
# set initial level to WARN.
def setup(app, version:str='undefined'):
environment = getattr(app.settings, 'ROLE', None) or os.environ.get('ROLE')
dsn = getattr(app.settings, 'SENTRY_DSN', None)
tags = getattr(app.settings, 'SENTRY_TAGS', None)
client = Client(dsn=dsn, transport=AioHttpTransport,
version=version, environment=environment,
tags=tags)
handler = DjaioSentryHandler(client=client)
handler.setLevel(logging.ERROR)
setup_logging(handler)
app.raven = client
def setup_logs(args):
"""
Initialize the api loggers.
Args:
args: dict containing the configuration options.
"""
flask_logging.create_logger = lambda app: use(app.logger_name)
if not args.get("debug", True):
set_level("werkzeug", logging.ERROR)
level = [logging.WARNING, logging.INFO, logging.DEBUG][
min(args.get("verbose", 1), 2)]
internal_error_log = ExceptionHandler()
internal_error_log.setLevel(logging.ERROR)
log.root.setLevel(level)
log.root.addHandler(internal_error_log)
if api.config.get_settings()["email"]["enable_email"]:
severe_error_log = SevereHandler()
severe_error_log.setLevel(logging.CRITICAL)
log.root.addHandler(severe_error_log)
stats_log = StatsHandler()
stats_log.setLevel(logging.INFO)
log.root.addHandler(stats_log)
def ConvertLogLevel( oldstyle_level ):
if oldstyle_level == 0 :
return CF.LogLevels.FATAL
if oldstyle_level == 1 :
return CF.LogLevels.ERROR
if oldstyle_level == 2 :
return CF.LogLevels.WARN
if oldstyle_level == 3 :
return CF.LogLevels.INFO
if oldstyle_level == 4 :
return CF.LogLevels.DEBUG
if oldstyle_level == 5 :
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def ConvertLevelNameToDebugLevel( level_name ):
if level_name == "OFF" : return 0
if level_name == "FATAL" : return 0
if level_name == "ERROR" : return 1
if level_name == "WARN" : return 2
if level_name == "INFO" : return 3
if level_name == "DEBUG" : return 4
if level_name == "TRACE": return 5
if level_name == "ALL" : return 5
return 3
def ConvertLevelNameToCFLevel( level_name ):
if level_name == "OFF" : return CF.LogLevels.OFF
if level_name == "FATAL" : return CF.LogLevels.FATAL
if level_name == "ERROR" : return CF.LogLevels.ERROR
if level_name == "WARN" : return CF.LogLevels.WARN
if level_name == "INFO" : return CF.LogLevels.INFO
if level_name == "DEBUG" : return CF.LogLevels.DEBUG
if level_name == "TRACE": return CF.LogLevels.TRACE
if level_name == "ALL" : return CF.LogLevels.ALL
return CF.LogLevels.INFO
def __init__(self, log_level=ERROR):
"""Initialize the deployment environment."""
super(OpenStackAmuletUtils, self).__init__(log_level)
def __init__(self, log_level=logging.ERROR):
self.log = self.get_logger(level=log_level)
self.ubuntu_releases = self.get_ubuntu_releases()