def test_stack_context(self):
with ExpectLog(app_log, "Uncaught exception GET /"):
self.http_client.fetch(self.get_url('/'), self.handle_response)
self.wait()
self.assertEqual(self.response.code, 500)
self.assertTrue(b'got expected exception' in self.response.body)
python类ExpectLog()的实例源码
def test_exception_logging(self):
"""Uncaught exceptions get logged by the IOLoop."""
# Use a NullContext to keep the exception from being caught by
# AsyncTestCase.
with NullContext():
self.io_loop.add_callback(lambda: 1 / 0)
self.io_loop.add_callback(self.stop)
with ExpectLog(app_log, "Exception in callback"):
self.wait()
def test_exception_logging_future(self):
"""The IOLoop examines exceptions from Futures and logs them."""
with NullContext():
@gen.coroutine
def callback():
self.io_loop.add_callback(self.stop)
1 / 0
self.io_loop.add_callback(callback)
with ExpectLog(app_log, "Exception in callback"):
self.wait()
def test_spawn_callback(self):
# An added callback runs in the test's stack_context, so will be
# re-arised in wait().
self.io_loop.add_callback(lambda: 1 / 0)
with self.assertRaises(ZeroDivisionError):
self.wait()
# A spawned callback is run directly on the IOLoop, so it will be
# logged without stopping the test.
self.io_loop.spawn_callback(lambda: 1 / 0)
self.io_loop.add_callback(self.stop)
with ExpectLog(app_log, "Exception in callback"):
self.wait()
def test_non_ssl_request(self):
# Make sure the server closes the connection when it gets a non-ssl
# connection, rather than waiting for a timeout or otherwise
# misbehaving.
with ExpectLog(gen_log, '(SSL Error|uncaught exception)'):
with ExpectLog(gen_log, 'Uncaught exception', required=False):
self.http_client.fetch(
self.get_url("/").replace('https:', 'http:'),
self.stop,
request_timeout=3600,
connect_timeout=3600)
response = self.wait()
self.assertEqual(response.code, 599)
def test_error_logging(self):
# No stack traces are logged for SSL errors.
with ExpectLog(gen_log, 'SSL Error') as expect_log:
self.http_client.fetch(
self.get_url("/").replace("https:", "http:"),
self.stop)
response = self.wait()
self.assertEqual(response.code, 599)
self.assertFalse(expect_log.logged_stack)
# Python's SSL implementation differs significantly between versions.
# For example, SSLv3 and TLSv1 throw an exception if you try to read
# from the socket before the handshake is complete, but the default
# of SSLv23 allows it.
def test_malformed_body(self):
# parse_qs is pretty forgiving, but it will fail on python 3
# if the data is not utf8. On python 2 parse_qs will work,
# but then the recursive_unicode call in EchoHandler will
# fail.
if str is bytes:
return
with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'):
response = self.fetch(
'/echo', method="POST",
headers={'Content-Type': 'application/x-www-form-urlencoded'},
body=b'\xe9')
self.assertEqual(200, response.code)
self.assertEqual(b'{}', response.body)
def test_malformed_first_line(self):
with ExpectLog(gen_log, '.*Malformed HTTP request line'):
self.stream.write(b'asdf\r\n\r\n')
# TODO: need an async version of ExpectLog so we don't need
# hard-coded timeouts here.
self.io_loop.add_timeout(datetime.timedelta(seconds=0.01),
self.stop)
self.wait()
def test_unix_socket_bad_request(self):
# Unix sockets don't have remote addresses so they just return an
# empty string.
with ExpectLog(gen_log, "Malformed HTTP message from"):
self.stream.write(b"garbage\r\n\r\n")
self.stream.read_until_close(self.stop)
response = self.wait()
self.assertEqual(response, b"")
def test_gzip_unsupported(self):
# Gzip support is opt-in; without it the server fails to parse
# the body (but parsing form bodies is currently just a log message,
# not a fatal error).
with ExpectLog(gen_log, "Unsupported Content-Encoding"):
response = self.post_gzip('foo=bar')
self.assertEquals(json_decode(response.body), {})
def test_large_headers(self):
with ExpectLog(gen_log, "Unsatisfiable read", required=False):
response = self.fetch("/", headers={'X-Filler': 'a' * 1000})
# 431 is "Request Header Fields Too Large", defined in RFC
# 6585. However, many implementations just close the
# connection in this case, resulting in a 599.
self.assertIn(response.code, (431, 599))
def test_large_body_buffered(self):
with ExpectLog(gen_log, '.*Content-Length too long'):
response = self.fetch('/buffered', method='PUT', body=b'a' * 10240)
self.assertEqual(response.code, 599)
def test_large_body_buffered_chunked(self):
with ExpectLog(gen_log, '.*chunked body too large'):
response = self.fetch('/buffered', method='PUT',
body_producer=lambda write: write(b'a' * 10240))
self.assertEqual(response.code, 599)
def test_large_body_streaming_chunked(self):
with ExpectLog(gen_log, '.*chunked body too large'):
response = self.fetch('/streaming', method='PUT',
body_producer=lambda write: write(b'a' * 10240))
self.assertEqual(response.code, 599)
def test_timeout(self):
stream = IOStream(socket.socket())
try:
yield stream.connect(('127.0.0.1', self.get_http_port()))
# Use a raw stream because AsyncHTTPClient won't let us read a
# response without finishing a body.
stream.write(b'PUT /streaming?body_timeout=0.1 HTTP/1.0\r\n'
b'Content-Length: 42\r\n\r\n')
with ExpectLog(gen_log, 'Timeout reading body'):
response = yield stream.read_until_close()
self.assertEqual(response, b'')
finally:
stream.close()
def test_coroutine_exception_handler(self):
# Make sure we get an error and not a timeout
with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
response = self.fetch('/coroutine_exception')
self.assertEqual(500, response.code)
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
version=1)
cookie = handler._cookies['foo']
match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
self.assertTrue(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
self.assertTrue(
handler.get_secure_cookie('foo', min_version=1) is None)
def test_error(self):
# Percent signs (encoded as %25) should not mess up printf-style
# messages in logs
with ExpectLog(gen_log, ".*Invalid unicode"):
self.fetch("/group/?arg=%25%e9")
def test_decode_argument_invalid_unicode(self):
# test that invalid unicode in URLs causes 400, not 500
with ExpectLog(gen_log, ".*Invalid unicode.*"):
response = self.fetch("/typecheck/invalid%FF")
self.assertEqual(response.code, 400)
response = self.fetch("/typecheck/invalid?foo=%FF")
self.assertEqual(response.code, 400)
def test_default(self):
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/default")
self.assertEqual(response.code, 500)
self.assertTrue(b"500: Internal Server Error" in response.body)
response = self.fetch("/default?status=503")
self.assertEqual(response.code, 503)
self.assertTrue(b"503: Service Unavailable" in response.body)