def new_access():
bottle_app.oauth = setup_oauth(r_c, conf_obj, store_tokens_callback)
secret_keys = r_c.get('secret_keys')
if not secret_keys:
secret_keys = []
else:
secret_keys = json.loads(str(secret_keys, encoding='utf-8', errors='strict'))
if str(bottle.request.POST.get('diycrate_secret_key')) not in secret_keys:
raise ValueError('No matching secret key; we are being secure!')
return json.dumps([el.decode(encoding='utf-8', errors='strict') if isinstance(el, bytes) else el for el in bottle_app.oauth.refresh(bottle.request.POST.get('access_token'))])
# Create our own sub-class of Bottle's ServerAdapter
# so that we can specify SSL. Using just server='cherrypy'
# uses the default cherrypy server, which doesn't use SSL
python类ServerAdapter()的实例源码
def get_context(self):
c = super(SecuredSSLServer, self).get_context()
c.set_options(SSL.OP_NO_SSLv2)
c.set_options(SSL.OP_NO_SSLv3)
c.set_options(SSL.OP_NO_TLSv1)
c.set_options(SSL.OP_NO_TLSv1_1)
return c
# Create our own sub-class of Bottle's ServerAdapter
# so that we can specify SSL. Using just server='cherrypy'
# uses the default cherrypy server, which doesn't use SSL
def _start_server(self):
class BottleServerAdapter(bottle.ServerAdapter):
proxy = self
def close_session(self):
self.proxy.ctx.model.log._session.remove()
def run(self, app):
class Server(wsgiref.simple_server.WSGIServer):
allow_reuse_address = True
bottle_server = self
def handle_error(self, request, client_address):
pass
def serve_forever(self, poll_interval=0.5):
try:
wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
finally:
# Once shutdown is called, we need to close the session.
# If the session is not closed properly, it might raise warnings,
# or even lock the database.
self.bottle_server.close_session()
class Handler(wsgiref.simple_server.WSGIRequestHandler):
def address_string(self):
return self.client_address[0]
def log_request(*args, **kwargs): # pylint: disable=no-method-argument
if not self.quiet:
return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
**kwargs)
server = wsgiref.simple_server.make_server(
host=self.host,
port=self.port,
app=app,
server_class=Server,
handler_class=Handler)
self.proxy.server = server
self.proxy._started.put(True)
server.serve_forever(poll_interval=0.1)
def serve():
# Since task is a thread_local object, we need to patch it inside the server thread.
self._ctx_patcher(self.ctx)
bottle_app = bottle.Bottle()
bottle_app.post('/', callback=self._request_handler)
bottle.run(
app=bottle_app,
host='localhost',
port=self.port,
quiet=True,
server=BottleServerAdapter)
thread = threading.Thread(target=serve)
thread.daemon = True
thread.start()
return thread