def open(self):
try:
# In a websocket context, many RequestHandler methods
# raise RuntimeErrors.
self.set_status(503)
raise Exception("did not get expected exception")
except RuntimeError:
pass
self.write_message(self.request.headers.get('X-Test', ''))
python类RequestHandler()的实例源码
def get_app(self):
class HelloHandler(RequestHandler):
def get(self):
self.finish('Hello world')
def post(self):
self.finish('Hello world')
class LargeHandler(RequestHandler):
def get(self):
# 512KB should be bigger than the socket buffers so it will
# be written out in chunks.
self.write(''.join(chr(i % 256) * 1024 for i in range(512)))
class FinishOnCloseHandler(RequestHandler):
@asynchronous
def get(self):
self.flush()
def on_connection_close(self):
# This is not very realistic, but finishing the request
# from the close callback has the right timing to mimic
# some errors seen in the wild.
self.finish('closed')
return Application([('/', HelloHandler),
('/large', LargeHandler),
('/finish_on_close', FinishOnCloseHandler)])
def get_app(self):
class BufferedHandler(RequestHandler):
def put(self):
self.write(str(len(self.request.body)))
@stream_request_body
class StreamingHandler(RequestHandler):
def initialize(self):
self.bytes_read = 0
def prepare(self):
if 'expected_size' in self.request.arguments:
self.request.connection.set_max_body_size(
int(self.get_argument('expected_size')))
if 'body_timeout' in self.request.arguments:
self.request.connection.set_body_timeout(
float(self.get_argument('body_timeout')))
def data_received(self, data):
self.bytes_read += len(data)
def put(self):
self.write(str(self.bytes_read))
return Application([('/buffered', BufferedHandler),
('/streaming', StreamingHandler)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 79
收藏 0
点赞 0
评论 0
def get_app(self):
class SmallHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 100)
self.write("ok")
class LargeHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 1000)
self.write("ok")
return Application([('/small', SmallHeaders),
('/large', LargeHeaders)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_app(self):
class SmallBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 64)
class LargeBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 100)
return Application([('/small', SmallBody),
('/large', LargeBody)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def get_app(self):
class LargeBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 100)
return Application([('/large', LargeBody)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def get_app(self):
class ChunkedWithContentLength(RequestHandler):
def get(self):
# Add an invalid Transfer-Encoding to the response
self.set_header('Transfer-Encoding', 'chunked')
self.write("Hello world")
return Application([('/chunkwithcl', ChunkedWithContentLength)])
def get_app(self):
class ProcessHandler(RequestHandler):
def get(self):
if self.get_argument("exit", None):
# must use os._exit instead of sys.exit so unittest's
# exception handler doesn't catch it
os._exit(int(self.get_argument("exit")))
if self.get_argument("signal", None):
os.kill(os.getpid(),
int(self.get_argument("signal")))
self.write(str(os.getpid()))
return Application([("/", ProcessHandler)])
def get_app(self):
class HelloHandler(RequestHandler):
def get(self):
self.finish('Hello world')
def post(self):
self.finish('Hello world')
class LargeHandler(RequestHandler):
def get(self):
# 512KB should be bigger than the socket buffers so it will
# be written out in chunks.
self.write(''.join(chr(i % 256) * 1024 for i in range(512)))
class FinishOnCloseHandler(RequestHandler):
@asynchronous
def get(self):
self.flush()
def on_connection_close(self):
# This is not very realistic, but finishing the request
# from the close callback has the right timing to mimic
# some errors seen in the wild.
self.finish('closed')
return Application([('/', HelloHandler),
('/large', LargeHandler),
('/finish_on_close', FinishOnCloseHandler)])
def get_app(self):
class BufferedHandler(RequestHandler):
def put(self):
self.write(str(len(self.request.body)))
@stream_request_body
class StreamingHandler(RequestHandler):
def initialize(self):
self.bytes_read = 0
def prepare(self):
if 'expected_size' in self.request.arguments:
self.request.connection.set_max_body_size(
int(self.get_argument('expected_size')))
if 'body_timeout' in self.request.arguments:
self.request.connection.set_body_timeout(
float(self.get_argument('body_timeout')))
def data_received(self, data):
self.bytes_read += len(data)
def put(self):
self.write(str(self.bytes_read))
return Application([('/buffered', BufferedHandler),
('/streaming', StreamingHandler)])
def start_tornado_server(self):
class HelloHandler(RequestHandler):
def get(self):
self.write("Hello from tornado!")
app = Application([('/', HelloHandler)],
log_function=lambda x: None)
server = HTTPServer(app, io_loop=self.io_loop)
sock, self.tornado_port = bind_unused_port()
server.add_sockets([sock])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def get_app(self):
class SmallHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 100)
self.write("ok")
class LargeHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 1000)
self.write("ok")
return Application([('/small', SmallHeaders),
('/large', LargeHeaders)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_app(self):
class LargeBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 100)
return Application([('/large', LargeBody)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def get_app(self):
class ChunkedWithContentLength(RequestHandler):
def get(self):
# Add an invalid Transfer-Encoding to the response
self.set_header('Transfer-Encoding', 'chunked')
self.write("Hello world")
return Application([('/chunkwithcl', ChunkedWithContentLength)])
def get_app(self):
class ProcessHandler(RequestHandler):
def get(self):
if self.get_argument("exit", None):
# must use os._exit instead of sys.exit so unittest's
# exception handler doesn't catch it
os._exit(int(self.get_argument("exit")))
if self.get_argument("signal", None):
os.kill(os.getpid(),
int(self.get_argument("signal")))
self.write(str(os.getpid()))
return Application([("/", ProcessHandler)])
def open(self):
try:
# In a websocket context, many RequestHandler methods
# raise RuntimeErrors.
self.set_status(503)
raise Exception("did not get expected exception")
except RuntimeError:
pass
self.write_message(self.request.headers.get('X-Test', ''))
def get_app(self):
class BufferedHandler(RequestHandler):
def put(self):
self.write(str(len(self.request.body)))
@stream_request_body
class StreamingHandler(RequestHandler):
def initialize(self):
self.bytes_read = 0
def prepare(self):
if 'expected_size' in self.request.arguments:
self.request.connection.set_max_body_size(
int(self.get_argument('expected_size')))
if 'body_timeout' in self.request.arguments:
self.request.connection.set_body_timeout(
float(self.get_argument('body_timeout')))
def data_received(self, data):
self.bytes_read += len(data)
def put(self):
self.write(str(self.bytes_read))
return Application([('/buffered', BufferedHandler),
('/streaming', StreamingHandler)])
def start_tornado_server(self):
class HelloHandler(RequestHandler):
def get(self):
self.write("Hello from tornado!")
app = Application([('/', HelloHandler)],
log_function=lambda x: None)
server = HTTPServer(app, io_loop=self.io_loop)
sock, self.tornado_port = bind_unused_port()
server.add_sockets([sock])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_app(self):
class SmallHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 100)
self.write("ok")
class LargeHeaders(RequestHandler):
def get(self):
self.set_header("X-Filler", "a" * 1000)
self.write("ok")
return Application([('/small', SmallHeaders),
('/large', LargeHeaders)])
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 46
收藏 0
点赞 0
评论 0
def get_app(self):
class SmallBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 64)
class LargeBody(RequestHandler):
def get(self):
self.write("a" * 1024 * 100)
return Application([('/small', SmallBody),
('/large', LargeBody)])