def set_secure_cookie(self, name, value, expires_days=30, version=None,
**kwargs):
"""Signs and timestamps a cookie so it cannot be forged.
You must specify the ``cookie_secret`` setting in your Application
to use this method. It should be a long, random sequence of bytes
to be used as the HMAC secret for the signature.
To read a cookie set with this method, use `get_secure_cookie()`.
Note that the ``expires_days`` parameter sets the lifetime of the
cookie in the browser, but is independent of the ``max_age_days``
parameter to `get_secure_cookie`.
Secure cookies may contain arbitrary byte values, not just unicode
strings (unlike regular cookies)
.. versionchanged:: 3.2.1
Added the ``version`` argument. Introduced cookie version 2
and made it the default.
"""
self.set_cookie(name, self.create_signed_value(name, value,
version=version),
expires_days=expires_days, **kwargs)
python类Application()的实例源码
def log_request(self, handler):
"""Writes a completed HTTP request to the logs.
By default writes to the python root logger. To change
this behavior either subclass Application and override this method,
or pass a function in the application settings dictionary as
``log_function``.
"""
if "log_function" in self.settings:
self.settings["log_function"](handler)
return
if handler.get_status() < 400:
log_method = access_log.info
elif handler.get_status() < 500:
log_method = access_log.warning
else:
log_method = access_log.error
request_time = 1000.0 * handler.request.request_time()
log_method("%d %s %.2fms", handler.get_status(),
handler._request_summary(), request_time)
def get_application() -> web.Application:
return web.Application([
('/', MainHandler),
('/1', LinkHandler1),
('/redirect1', RedirectHandler1),
('/redirect2', RedirectHandler2),
], logging='error')
def run():
define('port', default=8090, type=int, help='')
define('debug', default=False, type=bool, help='')
parse_command_line()
settings['debug'] = options.debug
if settings['debug']:
print 'debug mode'
'''
connect mongodb
'''
try:
client = MotorClient(settings['database']['address'])
settings['connection'] = client[settings['database']['db']]
except:
print 'can not connect MongoDB'
sys.exit(0)
'''
connect redis
'''
try:
client = redis.Redis(host=settings['redis']['host'],
port=settings['redis']['port'],
db=settings['redis']['db'])
settings['redis_conn'] = client
except:
print 'can not connect redis'
sys.exit(0)
application = Application(
handlers=urlpattern,
**settings
)
http_server = HTTPServer(application, xheaders=True)
http_server.listen(options.port)
IOLoop.instance().start()
def get_app(self):
class ProcessHandler(RequestHandler):
def get(self):
if self.get_argument("exit", None):
# must use os._exit instead of sys.exit so unittest's
# exception handler doesn't catch it
os._exit(int(self.get_argument("exit")))
if self.get_argument("signal", None):
os.kill(os.getpid(),
int(self.get_argument("signal")))
self.write(str(os.getpid()))
return Application([("/", ProcessHandler)])
def get_app(self):
self.close_future = Future()
return Application([
('/echo', EchoHandler, dict(close_future=self.close_future)),
('/non_ws', NonWebSocketHandler),
('/header', HeaderHandler, dict(close_future=self.close_future)),
('/close_reason', CloseReasonHandler,
dict(close_future=self.close_future)),
('/error_in_on_message', ErrorInOnMessageHandler,
dict(close_future=self.close_future)),
('/async_prepare', AsyncPrepareHandler,
dict(close_future=self.close_future)),
])
def get_app(self):
self.close_future = Future()
return Application([
('/echo', EchoHandler, dict(
close_future=self.close_future,
compression_options=self.get_server_compression_options())),
])
def wrap_web_tests_adapter():
result = {}
for cls in web_test.wsgi_safe_tests:
class WSGIAdapterWrappedTest(cls):
def get_app(self):
self.app = Application(self.get_handlers(),
**self.get_app_kwargs())
return WSGIContainer(validator(WSGIAdapter(self.app)))
result["WSGIAdapter_" + cls.__name__] = WSGIAdapterWrappedTest
return result
def get_app(self):
return Application([('/', TestRequestHandler,
dict(io_loop=self.io_loop))])
def get_app(self):
return Application([('/', HelloWorldRequestHandler,
dict(protocol="https"))])
def test_missing_arguments(self):
application = Application()
self.assertRaises(KeyError, HTTPServer, application, ssl_options={
"keyfile": "/__missing__.crt",
})
def get_app(self):
return Application(self.get_handlers())
def get_app(self):
return Application([("/echo", EchoHandler),
("/typecheck", TypeCheckHandler),
("//doubleslash", EchoHandler),
])
def get_app(self):
return Application([('/', XHeaderTest.Handler)])
def setUp(self):
super(UnixSocketTest, self).setUp()
self.tmpdir = tempfile.mkdtemp()
self.sockfile = os.path.join(self.tmpdir, "test.sock")
sock = netutil.bind_unix_socket(self.sockfile)
app = Application([("/hello", HelloWorldRequestHandler)])
self.server = HTTPServer(app, io_loop=self.io_loop)
self.server.add_socket(sock)
self.stream = IOStream(socket.socket(socket.AF_UNIX), io_loop=self.io_loop)
self.stream.connect(self.sockfile, self.stop)
self.wait()
def get_app(self):
class HelloHandler(RequestHandler):
def get(self):
self.finish('Hello world')
def post(self):
self.finish('Hello world')
class LargeHandler(RequestHandler):
def get(self):
# 512KB should be bigger than the socket buffers so it will
# be written out in chunks.
self.write(''.join(chr(i % 256) * 1024 for i in range(512)))
class FinishOnCloseHandler(RequestHandler):
@asynchronous
def get(self):
self.flush()
def on_connection_close(self):
# This is not very realistic, but finishing the request
# from the close callback has the right timing to mimic
# some errors seen in the wild.
self.finish('closed')
return Application([('/', HelloHandler),
('/large', LargeHandler),
('/finish_on_close', FinishOnCloseHandler)])
def get_app(self):
return Application([('/', EchoHandler)])
def get_app(self):
return Application([('/', HelloWorldRequestHandler)])
def get_app(self):
class BufferedHandler(RequestHandler):
def put(self):
self.write(str(len(self.request.body)))
@stream_request_body
class StreamingHandler(RequestHandler):
def initialize(self):
self.bytes_read = 0
def prepare(self):
if 'expected_size' in self.request.arguments:
self.request.connection.set_max_body_size(
int(self.get_argument('expected_size')))
if 'body_timeout' in self.request.arguments:
self.request.connection.set_body_timeout(
float(self.get_argument('body_timeout')))
def data_received(self, data):
self.bytes_read += len(data)
def put(self):
self.write(str(self.bytes_read))
return Application([('/buffered', BufferedHandler),
('/streaming', StreamingHandler)])
def get_app(self):
return Application([
('/digest', DigestAuthHandler),
('/custom_reason', CustomReasonHandler),
('/custom_fail_reason', CustomFailReasonHandler),
])