def test_with_other_error_in_acquire_without_raise(self):
logging.raiseExceptions = False
self._test_with_failure_in_method('acquire', IndexError)
python类raiseExceptions()的实例源码
def test_with_other_error_in_flush_without_raise(self):
logging.raiseExceptions = False
self._test_with_failure_in_method('flush', IndexError)
def test_with_other_error_in_acquire_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'acquire', IndexError)
def test_with_other_error_in_flush_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'flush', IndexError)
def test_with_other_error_in_close_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'close', IndexError)
def test_log_invalid_level_with_raise(self):
old_raise = logging.raiseExceptions
self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
logging.raiseExceptions = True
self.assertRaises(TypeError, self.logger.log, '10', 'test message')
def test_log_invalid_level_no_raise(self):
old_raise = logging.raiseExceptions
self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
logging.raiseExceptions = False
self.logger.log('10', 'test message') # no exception happens
def setup_logger(cls, args, logging_queue=None, main=False):
assert isinstance(args, argparse.Namespace)
if logging_queue:
cls._queue = logging_queue
log_level = logging.DEBUG if args.debug else logging.INFO
log_handlers = []
if not args.dev:
logging.raiseExceptions = False # Don't raise exception if in production mode
if cls._queue:
log_handlers.append(QueueHandler(cls._queue))
else:
if args.dev:
log_handlers.append(logging.StreamHandler())
if args.debug:
try:
with open(constants.log_debug, 'x') as f:
pass
except FileExistsError:
pass
lg = logging.FileHandler(constants.log_debug, 'w', 'utf-8')
lg.setLevel(logging.DEBUG)
log_handlers.append(lg)
for log_path, lvl in ((constants.log_normal, logging.INFO),
(constants.log_error, logging.ERROR)):
try:
with open(log_path, 'x') as f: # noqa: F841
pass
except FileExistsError:
pass
lg = RotatingFileHandler(
log_path,
maxBytes=100000 * 10,
encoding='utf-8',
backupCount=1)
lg.setLevel(lvl)
log_handlers.append(lg)
logging.basicConfig(
level=log_level,
format='%(asctime)-8s %(levelname)-10s %(name)-10s %(message)s',
datefmt='%d-%m %H:%M',
handlers=tuple(log_handlers))
if main:
if args.dev:
Logger("sqlalchemy.pool").setLevel(logging.DEBUG)
Logger("sqlalchemy.engine").setLevel(logging.INFO)
Logger("sqlalchemy.orm").setLevel(logging.INFO)
Logger(__name__).i(
os.path.split(
constants.log_debug)[1], "created at", os.path.abspath(
constants.log_debug), stdout=True)
def logger():
getpid_patch = patch('logging.os.getpid', return_value=111)
getpid_patch.start()
time_patch = patch('logging.time.time', return_value=946725071.111111)
time_patch.start()
localzone_patch = patch('rfc5424logging.handler.get_localzone', return_value=timezone)
localzone_patch.start()
hostname_patch = patch('rfc5424logging.handler.socket.gethostname', return_value="testhostname")
hostname_patch.start()
connect_patch = patch('logging.handlers.socket.socket.connect', side_effect=connect_mock)
connect_patch.start()
sendall_patch = patch('logging.handlers.socket.socket.sendall', side_effect=connect_mock)
sendall_patch.start()
if '_levelNames' in logging.__dict__:
# Python 2.7
level_patch = patch.dict(logging._levelNames)
level_patch.start()
else:
# Python 3.x
level_patch1 = patch.dict(logging._levelToName)
level_patch1.start()
level_patch2 = patch.dict(logging._nameToLevel)
level_patch2.start()
logging.raiseExceptions = True
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
yield logger
getpid_patch.stop()
time_patch.stop()
localzone_patch.stop()
hostname_patch.stop()
connect_patch.stop()
sendall_patch.stop()
if '_levelNames' in logging.__dict__:
# Python 2.7
level_patch.stop()
else:
# Python 3.x
level_patch1.stop()
level_patch2.stop()
Rfc5424SysLogAdapter._extra_levels_enabled = False