def _error(error, code):
"""Return an error as json"""
_error = {
"status": str(code),
"title": errors[code],
"detail": error.body
}
traceback.print_exception(type(error), error, error.traceback)
response.headers["Content-Type"] = "application/vnd.api+json"
return response_object(errors=[_error])
python类headers()的实例源码
def get_resources():
"""Return the resources of the authenticated user.
If authentication failed, this aborts the execution with
401 Unauthorized.
"""
#pprint(dict(request.headers))
header = request.environ.get('HTTP_AUTHORIZATION','')
if header:
print("Authorization:", header)
basic = request.auth
if basic:
username, password = basic
if passwords.get(username) != password:
abort(401, BASIC_ERROR)
else:
api_key = get_api_key()
if api_key is not None:
username = api_keys.get(api_key)
if username is None:
abort(401, API_KEY_ERROR)
else:
username = None
return _resources[username]
def getStations(self, busca=''):
print
response.headers['Access-Control-Allow-Origin']='*'
response.headers['Content-Type']='application/json'
_estacoes = self._g._get() if busca == '' else self._g._busca(busca)
_bloco = []
for _estacao in _estacoes:
_nome, _lat, _long, _endereco, _linha, _statusOnline, _StatusOperacional, _disp1, _disp2, _total, _internalStatus, _img, _id = _estacao
_bloco.append({ "type" : "Feature",
"geometry" : {
"coordinates": [ _long, _lat ],
"type": "Point" },
"properties": {
"id" : _id,
"nome" : _nome,
"endereco" : _endereco,
"estacao" : _linha,
"status_online": _statusOnline,
"status_operacional" : _StatusOperacional,
"qtd_bikes_disp_1" : _disp1,
"qtd_bikes_disp_2" : _disp2,
"qtd_vagas_total" : _total,
"statusInterno" : _internalStatus }
})
return json.dumps({ "features" : _bloco, "type": "FeatureCollection" }, indent=2, sort_keys=True)
def enable_cors(func):
def _enable_cors(*args, **kwargs):
hds = response.headers
hds['Access-Control-Allow-Origin'] = '*'
hds['Access-Control-Allow-Methods'] = ', '.join(['PUT', 'GET',
'POST', 'DELETE',
'OPTIONS'])
allow = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
hds['Access-Control-Allow-Headers'] = allow
if request.method != 'OPTIONS':
return func(*args, **kwargs)
return _enable_cors
def get_status(jid):
# following headers are needed because of CORS
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
resp = db(jobs.id==jid).select(jobs.state).first()
if resp is None: return 'X'
else: return resp.state
def output():
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'PUT, GET, POST, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
app = request.query.app
cid = request.query.cid
user = request.query.user
try:
if re.search("/", cid):
(u, c) = cid.split("/")
else:
u = user
c = cid
run_dir = os.path.join(user_dir, u, app, c)
fn = os.path.join(run_dir, app + '.out')
output = slurp_file(fn)
# the following line will convert HTML chars like > to entities >
# this is needed so that XML input files will show paramters labels
output = cgi.escape(output)
return output
# params = { 'cid': cid, 'contents': output, 'app': app,
# 'user': u, 'fn': fn, 'apps': myapps.keys() }
# return template('more', params)
except:
return "ERROR: something went wrong!"
# params = { 'app': app, 'apps': myapps.keys(),
# 'err': "Couldn't read input file. Check casename." }
# return template('error', params)
def output():
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'PUT, GET, POST, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
app = request.query.app
cid = request.query.cid
user = request.query.user
try:
if re.search("/", cid):
(u, c) = cid.split("/")
else:
u = user
c = cid
run_dir = os.path.join(user_dir, u, app, c)
fn = os.path.join(run_dir, app + '.out')
output = slurp_file(fn)
# the following line will convert HTML chars like > to entities >
# this is needed so that XML input files will show paramters labels
output = cgi.escape(output)
return output
# params = { 'cid': cid, 'contents': output, 'app': app,
# 'user': u, 'fn': fn, 'apps': myapps.keys() }
# return template('more', params)
except:
return "ERROR: something went wrong!"
# params = { 'app': app, 'apps': myapps.keys(),
# 'err': "Couldn't read input file. Check casename." }
# return template('error', params)
def js_dynamic(path):
response.headers['Expires'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
time.gmtime(time.time() + 60 * 60 * 24 * 2))
response.headers['Cache-control'] = "public"
response.headers['Content-Type'] = "text/javascript; charset=UTF-8"
try:
# static files are not rendered
if "static" not in path and "mootools" not in path:
t = env.get_template("js/%s" % path)
return t.render()
else:
return static_file(path, root=join(PROJECT_DIR, "media", "js"))
except:
return HTTPError(404, "Not Found")
def server_static(path):
response.headers['Expires'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
time.gmtime(time.time() + 60 * 60 * 24 * 7))
response.headers['Cache-control'] = "public"
return static_file(path, root=join(PROJECT_DIR, "media"))
def get_api_key():
"""Return the api key or None."""
header = request.headers.get('Authorization')
if not header: return
try:
method, data = header.split(None, 1)
if method.lower() != 'api-key': return
return touni(base64.b64decode(tob(data[4:])))
except (ValueError, TypeError):
abort(401, HEADER_ERROR)
def get_endpoint_url():
"""Return the url that this server is reachable at."""
return "http://" + request.headers["Host"] + BASE
def test_jsonapi_header():
"""Make sure that the content type is set accordingly.
http://jsonapi.org/format/#content-negotiation-clients
"""
content_type = request.content_type
content_type_expected = "application/vnd.api+json"
if content_type != content_type_expected and content_type.startswith(content_type_expected):
abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format(
content_type_expected, content_type))
accepts = request.headers.get("Accept", "*/*").split(",")
expected_accept = ["*/*", "application/*", "application/vnd.api+json"]
if not any([accept in expected_accept for accept in accepts]):
abort(406, "The Accept header must one of \"{}\", not \"{}\".".format(
expected_accept, ",".join(accepts)))
def enable_cors():
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers[
'Access-Control-Allow-Methods'] = 'PUT, GET, POST, DELETE, OPTIONS'
response.headers[
'Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
def _devices_info(self):
response.headers['Content-Type'] = 'application/json'
devices_list = { 'devices': self._get_devices_info() }
return json.dumps(devices_list)
def _devices_value(self):
response.headers['Content-Type'] = 'application/json'
devices_list = { 'devices': self._get_devices_value() }
return json.dumps(devices_list)
def _device_set(self):
response.headers['Content-Type'] = 'application/json'
retval = { }
def verify_and_set(client):
try:
data = json.loads(request.body.read().decode('ascii'))
except:
raise ValueError('Unable to decode json data from PUT request')
if data is None:
raise ValueError('No data set in body of PUT request')
if not 'name' in data:
raise KeyError('No "name" field in body of PUT request')
if not 'value' in data:
raise KeyError('No "value" field in body of PUT request')
client.set_device_value(data['name'], data['value'])
if self._debug:
verify_and_set(self)
else:
try:
verify_and_set(self)
except Exception as e:
retval['error'] = str(e)
return json.dumps(retval)
def custom_headers():
"""Set some custom headers across all HTTP responses."""
response.headers["Server"] = "Machete Server"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Pragma"] = "no-cache"
response.headers["Cache-Control"] = "no-cache"
response.headers["Expires"] = "0"
def custom_headers():
"""Set some custom headers across all HTTP responses."""
response.headers["Server"] = "Machete Server"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Pragma"] = "no-cache"
response.headers["Cache-Control"] = "no-cache"
response.headers["Expires"] = "0"
def enable_cors(fn):
def _enable_cors(*args, **kwargs):
# set CORS headers
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
if bottle.request.method != 'OPTIONS':
# actual request; reply with the actual response
return fn(*args, **kwargs)
return _enable_cors
def _add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = \
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
def to_json(fnc):
@wraps(fnc)
def inner(*args, **kwargs):
bottle_resp.headers['Content-Type'] = 'application/json'
from_func = fnc(*args, **kwargs)
if from_func is not None:
return json.dumps({'result': from_func})
return inner
def enable_cors():
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
# Enable SSL w Self Signed Cert
# Source: https://github.com/mfm24/miscpython/blob/master/bottle_ssl.py
# Generate a cert: openssl req -new -x509 -keyout mfa_slipstream.pem -out mfa_slipstream.pem -days 365 -nodes