def test_wraps_at_whitespace(self):
assert len('123 567') > self.handler.width
self.handler.emit(logging.makeLogRecord({'msg': '123 567'}))
self.d.infobox.assert_called_once_with('123\n567', mock.ANY, mock.ANY)
python类makeLogRecord()的实例源码
def test_only_last_lines_are_printed(self):
assert len('a\nb\nc'.split()) > self.handler.height
self.handler.emit(logging.makeLogRecord({'msg': 'a\n\nb\nc'}))
self.d.infobox.assert_called_once_with('b\nc', mock.ANY, mock.ANY)
def handle(self):
"""Handle multiple requests - each expected to be of 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally."""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unpickle(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
for _ in range(tries):
try:
os.unlink(fname)
except OSError:
pass
time.sleep(0.004 * random.randint(0, 4))
del_count = 500
log_count = 500
for delay in (False, True):
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
os.close(fd)
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
remover.daemon = True
remover.start()
h = logging.handlers.WatchedFileHandler(fn, delay=delay)
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
h.setFormatter(f)
try:
for _ in range(log_count):
time.sleep(0.005)
r = logging.makeLogRecord({'msg': 'testing' })
h.handle(r)
finally:
remover.join()
try:
h.close()
except ValueError:
pass
if os.path.exists(fn):
os.unlink(fn)
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
def handle(self):
"""Handle multiple requests - each expected to be of 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally."""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unpickle(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
for _ in range(tries):
try:
os.unlink(fname)
except OSError:
pass
time.sleep(0.004 * random.randint(0, 4))
del_count = 500
log_count = 500
for delay in (False, True):
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
os.close(fd)
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
remover.daemon = True
remover.start()
h = logging.handlers.WatchedFileHandler(fn, delay=delay)
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
h.setFormatter(f)
try:
for _ in range(log_count):
time.sleep(0.005)
r = logging.makeLogRecord({'msg': 'testing' })
h.handle(r)
finally:
remover.join()
try:
h.close()
except ValueError:
pass
if os.path.exists(fn):
os.unlink(fn)
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
def log(self, msg: Message):
d = msg._asdict()
d['asctime'] = datetime.fromtimestamp(msg.time, self.tz).strftime('%Y-%m-%d %H:%M:%S')
d['srcname'] = msg.src.alias
d['srcid'] = msg.src.id
self.loghandler.emit(logging.makeLogRecord({'msg': self.FORMAT % d}))
def post(self):
self.log.handle(logging.makeLogRecord(self.get_json_body()))
def test_empty_filter(self):
f = logging.Filter()
r = logging.makeLogRecord({'name': 'spam.eggs'})
self.assertTrue(f.filter(r))
#
# First, we define our levels. There can be as many as you want - the only
# limitations are that they should be integers, the lowest should be > 0 and
# larger values mean less information being logged. If you need specific
# level values which do not fit into these limitations, you can use a
# mapping dictionary to convert between your application levels and the
# logging system.
#
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
for _ in range(tries):
try:
os.unlink(fname)
except OSError:
pass
time.sleep(0.004 * random.randint(0, 4))
del_count = 500
log_count = 500
for delay in (False, True):
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
os.close(fd)
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
remover.daemon = True
remover.start()
h = logging.handlers.WatchedFileHandler(fn, delay=delay)
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
h.setFormatter(f)
try:
for _ in range(log_count):
time.sleep(0.005)
r = logging.makeLogRecord({'msg': 'testing' })
h.handle(r)
finally:
remover.join()
h.close()
if os.path.exists(fn):
os.unlink(fn)
def handle_datagram(self, request):
slen = struct.pack('>L', 0) # length of prefix
packet = request.packet[len(slen):]
obj = pickle.loads(packet)
record = logging.makeLogRecord(obj)
self.log_output += record.msg + '\n'
self.handled.set()
def get_record(self, name=None):
result = dict(self.common)
if name is not None:
result.update(self.variants[name])
return logging.makeLogRecord(result)
def setUp(self):
self.records = [
logging.makeLogRecord({'msg': 'one'}),
logging.makeLogRecord({'msg': 'two'}),
]
def test_str_rep(self):
r = logging.makeLogRecord({})
s = str(r)
self.assertTrue(s.startswith('<LogRecord: '))
self.assertTrue(s.endswith('>'))
def test_multiprocessing(self):
r = logging.makeLogRecord({})
self.assertEqual(r.processName, 'MainProcess')
try:
import multiprocessing as mp
r = logging.makeLogRecord({})
self.assertEqual(r.processName, mp.current_process().name)
except ImportError:
pass
def test_delay(self):
os.unlink(self.fn)
fh = logging.FileHandler(self.fn, delay=True)
self.assertIsNone(fh.stream)
self.assertFalse(os.path.exists(self.fn))
fh.handle(logging.makeLogRecord({}))
self.assertIsNotNone(fh.stream)
self.assertTrue(os.path.exists(self.fn))
fh.close()
def test_rollover(self):
fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
backupCount=1)
fmt = logging.Formatter('%(asctime)s %(message)s')
fh.setFormatter(fmt)
r1 = logging.makeLogRecord({'msg': 'testing - initial'})
fh.emit(r1)
self.assertLogFile(self.fn)
time.sleep(1.1) # a little over a second ...
r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
fh.emit(r2)
fh.close()
# At this point, we should have a recent rotated file which we
# can test for the existence of. However, in practice, on some
# machines which run really slowly, we don't know how far back
# in time to go to look for the log file. So, we go back a fair
# bit, and stop as soon as we see a rotated file. In theory this
# could of course still fail, but the chances are lower.
found = False
now = datetime.datetime.now()
GO_BACK = 5 * 60 # seconds
for secs in range(GO_BACK):
prev = now - datetime.timedelta(seconds=secs)
fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
found = os.path.exists(fn)
if found:
self.rmfiles.append(fn)
break
msg = 'No rotated files found, went back %d seconds' % GO_BACK
if not found:
#print additional diagnostics
dn, fn = os.path.split(self.fn)
files = [f for f in os.listdir(dn) if f.startswith(fn)]
print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
print('The only matching files are: %s' % files, file=sys.stderr)
for f in files:
print('Contents of %s:' % f)
path = os.path.join(dn, f)
with open(path, 'r') as tf:
print(tf.read())
self.assertTrue(found, msg=msg)
def test_basic(self):
logtype = 'Application'
elh = win32evtlog.OpenEventLog(None, logtype)
num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
h = logging.handlers.NTEventLogHandler('test_logging')
r = logging.makeLogRecord({'msg': 'Test Log Message'})
h.handle(r)
h.close()
# Now see if the event is recorded
self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
win32evtlog.EVENTLOG_SEQUENTIAL_READ
found = False
GO_BACK = 100
events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
for e in events:
if e.SourceName != 'test_logging':
continue
msg = win32evtlogutil.SafeFormatMessage(e, logtype)
if msg != 'Test Log Message\r\n':
continue
found = True
break
msg = 'Record not found in event log, went back %d records' % GO_BACK
self.assertTrue(found, msg=msg)
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
def handle(self):
"""Handle multiple requests - each expected to be of 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally."""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unpickle(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
for _ in range(tries):
try:
os.unlink(fname)
except OSError:
pass
time.sleep(0.004 * random.randint(0, 4))
del_count = 500
log_count = 500
for delay in (False, True):
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
os.close(fd)
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
remover.daemon = True
remover.start()
h = logging.handlers.WatchedFileHandler(fn, delay=delay)
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
h.setFormatter(f)
try:
for _ in range(log_count):
time.sleep(0.005)
r = logging.makeLogRecord({'msg': 'testing' })
h.handle(r)
finally:
remover.join()
try:
h.close()
except ValueError:
pass
if os.path.exists(fn):
os.unlink(fn)
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.