def add_woman():
new_woman = {}
try:
max_id_woman = max(EXTRAORDINARY_WOMEN, key=lambda x:x['id'])
max_id = max_id_woman['id'] + 1
data = request.json
new_woman = {
"id": max_id,
"name": data['name'],
"origin": data['origin'],
"occupation": data['occupation']
}
EXTRAORDINARY_WOMEN.append(new_woman)
return HTTPResponse(
status=200,
body=json.dumps({"extraordinary_woman": new_woman}))
except:
return HTTPResponse(
status=400,
body=json.dumps({'error': 'error adding a woman'}))
python类HTTPResponse()的实例源码
def apply(self, callback, route):
def wrapper(*a, **ka):
try:
rv = callback(*a, **ka)
except HTTPResponse as resp:
rv = resp
if isinstance(rv, dict):
json_response = dumps(rv)
response.content_type = 'application/json'
return json_response
elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict):
rv.body = dumps(rv.body)
rv.content_type = 'application/json'
return rv
return wrapper
def _assert_admin(self, cert):
if cert is not None:
if isinstance(cert, bytes):
cert = load_certificate(cert_bytes=cert)
if isinstance(cert, x509.Certificate):
host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
else:
raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')
else:
logger.warning('Admin command received by unauthentified host.')
raise HTTPResponse(
status=401,
body={'message': 'Authentication required'}
)
if host not in self.admin_hosts:
logger.warning('Host %s unauthorized for admin commands.', host)
raise HTTPResponse(
status=403,
body={'message': 'Host {} unauthorized for admin commands'.format(host)}
)
def assertRedirect(self, target, result, query=None, status=303, **args):
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
for key in list(args):
if key.startswith('wsgi'):
args[key.replace('_', '.', 1)] = args[key]
del args[key]
env.update(args)
request.bind(env)
bottle.response.bind()
try:
bottle.redirect(target, **(query or {}))
except bottle.HTTPResponse:
r = _e()
self.assertEqual(status, r.status_code)
self.assertTrue(r.headers)
self.assertEqual(result, r.headers['Location'])
def test_mismatch_accept_header_true(self, app):
"""
Accept header does not match any renderers, however
a mismatch renderer has been provided
"""
@app.routecv
class ExampleView(JSONEchoView):
mismatch_renderer_class = ExampleRenderer
def dispatch(self):
return HTTPResponse("wtf")
resp = app.webtest.get('/echo',
headers={'Accept': 'vnd/invalid'})
assert resp.status == '200 OK'
assert resp.headers['Content-Type'] == 'vnd/example'
assert resp.body == b'wtf'
assert request.negotiation_context.renderer == ExampleRenderer
def __call__(self):
request.negotiation_context = cneg = ContentNegotiationContext()
request.negotiator = self.content_negotiation_class()
request.body_parsed = None
try:
response = super(ContentNegotiationViewMixin, self).__call__()
except HTTPResponse:
if not self.render_errors:
raise
if self.render_errors:
if not cneg.renderer:
raise
response = HTTPResponse()
get_exception().apply(response)
if cneg.renderer:
if not isinstance(response, HTTPResponse):
response = HTTPResponse(response)
response.body = cneg.renderer.render(response.body)
response.content_type = cneg.response_content_type
return response
def upload_image():
name = request.forms.get('name')
data = request.files.get('data')
if name and data and data.file:
raw = data.file.read()
filename = data.filename
save_path="{path}/{file}".format(
path=config.get('storage','imagesdir'),
file=filename
)
if not os.path.exists(config.get('storage','imagesdir')):
os.makedirs(save_path)
if 'image' not in magic.from_buffer(raw):
return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'}))
with open(save_path,'w') as open_file:
open_file.write(raw)
if queue.add_to_queue(queue_name='images',image=save_path):
return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'}))
else:
return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'}))
else:
return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
def maltego_response(trx, status_code=200, override=None):
"""Return a properly formatted Maltego response."""
try:
response = trx.returnOutput()
except Exception as err:
msg = "Encountered errors in Maltegos Python library."
override = custom_exception(681, msg)
if override:
response = override
return HTTPResponse(status=status_code, body=response)
def list_women():
return HTTPResponse(
status=200,
body=json.dumps({"extraordinary_women": EXTRAORDINARY_WOMEN}))
def get_woman(woman_id):
for woman in EXTRAORDINARY_WOMEN:
if woman['id'] == int(woman_id):
return HTTPResponse(
status=200,
body=json.dumps({'extraordinary_woman': woman}))
else:
return HTTPResponse(
status=404,
body=json.dumps({'error': 'id not found'}))
def delete_woman(woman_id):
for woman in EXTRAORDINARY_WOMEN:
if woman['id'] == int(woman_id):
EXTRAORDINARY_WOMEN.remove(woman)
return HTTPResponse(status=204)
else:
return HTTPResponse(
status=204,
body=json.dumps({'error': 'id not found'}))
def json_api_handler(handler):
"""Bottle handler decorator for JSON REST API handlers.
Args:
handler: A Bottle handler function.
Returns:
A wrapped Bottle handler function.
"""
@functools.wraps(handler)
def wrapped_handler(*args, **kwargs):
try:
handler_result = handler(*args, **kwargs)
except bottle.HTTPResponse as handler_result:
pass
except Exception:
logging.exception('Uncaught exception')
handler_result = bottle.HTTPError(500, 'Internal Server Error')
if isinstance(handler_result, bottle.HTTPResponse):
# For now, we do not support raising successful HTTPResponse.
assert handler_result.status_code // 100 != 2
# Forcibly convert HTTPError to HTTPResponse to avoid formatting.
response = handler_result.copy(cls=bottle.HTTPResponse)
response_data = {'ok': False, 'error': handler_result.body}
else:
assert isinstance(handler_result, dict)
response = bottle.response.copy(cls=bottle.HTTPResponse)
response_data = handler_result
response_data['ok'] = True
response.body = ujson.dumps(response_data, double_precision=6)
response.content_type = 'application/json'
return response
return wrapped_handler
def get_certificate():
username = post_get('username')
if not username:
raise bottle.HTTPError(500, "Username missing")
password = post_get('password', default=None)
days = int(post_get('days', default=None))
cert_gen = CertificateGenerator()
cert_gen.generate(password, username, days)
openvpn_config = cert_gen.get_openvpn_config()
headers = {
'Content-Type': 'text/plain;charset=UTF-8',
'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username,
"Content-Length": len(openvpn_config)
}
return bottle.HTTPResponse(openvpn_config, 200, **headers)
def __text_to_speech(self):
self.__check_auth()
text = request.query.text
times = int(request.query.times)
self.__tts.say(text, times)
return HTTPResponse(status=200, body='')
def _get_cert_config_if_allowed(self, domain, cert):
if cert is not None:
if isinstance(cert, bytes):
cert = load_certificate(cert_bytes=cert)
if isinstance(cert, x509.Certificate):
host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
else:
raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')
else:
logger.warning('Request received for domain %s by unauthentified host.', domain)
raise HTTPResponse(
status=401,
body={'message': 'Authentication required'}
)
certconfig = self.certificates_config.match(domain)
if certconfig:
logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern)
if host in self.admin_hosts or host in certconfig.allowed_hosts:
return certconfig
else:
logger.warning('Host %s unauthorized for domain %s.', host, domain)
raise HTTPResponse(
status=403,
body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)}
)
else:
logger.warning('No config matching domain %s found.', domain)
raise HTTPResponse(
status=404,
body={'message': 'No configuration found for domain {}'.format(domain)}
)
def _handle_auth(self):
request_data = request.json
csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend()) # pylint: disable=unsubscriptable-object
if not csr.is_signature_valid:
raise HTTPResponse(
status=400,
body={'message': 'The certificate signing request signature is invalid.'}
)
host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
csr_file = os.path.join(self.csr_path, "%s.csr" % (host))
crt_file = os.path.join(self.crt_path, "%s.crt" % (host))
if os.path.isfile(crt_file):
crt = load_certificate(crt_file)
if crt.public_key().public_numbers() == csr.public_key().public_numbers():
return {
'status': 'authorized',
'crt': dump_pem(crt).decode()
}
else:
raise HTTPResponse(
status=409,
body={'message': 'Mismatch between the certificate signing request and the certificate.'}
)
else:
# Save CSR
with open(csr_file, 'w') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM).decode())
response.status = 202
return {
'status': 'pending'
}
def test_redirect_preserve_cookies(self):
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
request.bind(env)
bottle.response.bind()
try:
bottle.response.set_cookie('xxx', 'yyy')
bottle.redirect('...')
except bottle.HTTPResponse:
h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie']
self.assertEqual(h, ['xxx=yyy'])
def test_json_HTTPResponse(self):
self.app.route('/')(lambda: bottle.HTTPResponse({'a': 1}, 500))
try:
self.assertBody(bottle.json_dumps({'a': 1}))
self.assertHeader('Content-Type','application/json')
except ImportError:
warn("Skipping JSON tests.")
def test_httpresponse_in_generator_callback(self):
@self.app.route('/')
def test():
yield bottle.HTTPResponse('test')
self.assertBody('test')
def dispatch(self):
return HTTPResponse([1,2,3])
def __http_index(self, user=None, key=None):
bottle.response.content_type = 'application/json'
if user:
if user not in self:
return bottle.HTTPResponse(status=404, body=json.dumps(None))
if key:
if key not in self.get(user):
return bottle.HTTPResponse(status=404, body=json.dumps(None))
return json.dumps(self.get(user).get(key))
return json.dumps(self.get(user))
return json.dumps(self)
def _build_http_response(self, request_der: bytes) -> HTTPResponse:
response_der = self._build_ocsp_response(request_der).dump()
return HTTPResponse(
status=200,
body=response_der,
content_type='application/ocsp-response',
)
def get_images():
image_path = request.query.get('path',None)
if os.path.exists(image_path):
data = open(image_path,'rb').read()
response.set_header('Content-type', 'image/jpeg')
return data
else:
HTTPResponse(status=404,body=json.dumps({'error' : 'image not found'}))
#------------------
# MAIN
#------------------
def return_analysis_result():
"""
Example of analyzer API(GET)
"""
# Get submitted contents.
text = request.query.input_text
# Analyze.
_text = your_tool.lowercase(text)
# Make a request.
body = json.dumps(_text)
r = HTTPResponse(status=200, body=body)
r.set_header('Content-Type', 'application/json')
return r
def enable_cors(self):
'''Enables Cross Origin Resource Sharing.
This makes sure the necessary headers are set so that this
web application's routes can be accessed from other origins.
:rtype: :class:`WebBuilder`
'''
def access_control_headers():
bottle.response.headers['Access-Control-Allow-Origin'] = '*'
bottle.response.headers['Access-Control-Allow-Methods'] = \
'GET, POST, PUT, DELETE, OPTIONS'
bottle.response.headers['Access-Control-Allow-Headers'] = \
'Origin, X-Requested-With, Content-Type, Accept, Authorization'
def options_response(res):
if bottle.request.method == 'OPTIONS':
new_res = bottle.HTTPResponse()
new_res.headers['Access-Control-Allow-Origin'] = '*'
new_res.headers['Access-Control-Allow-Methods'] = \
bottle.request.headers.get(
'Access-Control-Request-Method', '')
new_res.headers['Access-Control-Allow-Headers'] = \
bottle.request.headers.get(
'Access-Control-Request-Headers', '')
return new_res
res.headers['Allow'] += ', OPTIONS'
return bottle.request.app.default_error_handler(res)
self.app.add_hook('after_request', access_control_headers)
self.app.error_handler[int(405)] = options_response
return self
def transistor():
transistor_name = request.forms.get('transistorname')
state = request.forms.get('state')
token = request.forms.get('token')
valid_token = verify_auth_token(token)
if valid_token == True:
if transistor_name == 'transistor1' or transistor_name == 'transistor2' or transistor_name == 'transistor3':
if state == "high":
print "aqui"
high_rele(dic_pins[transistor_name],"b")
print dic_pins[transistor_name]
theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response
return HTTPResponse(status=200, body=theBody)
elif state == "low":
low_rele(dic_pins[transistor_name],"b")
theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response
return HTTPResponse(status=200, body=theBody)
else:
theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
return HTTPResponse(status=412, body=theBody)
else:
theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
return HTTPResponse(status=412, body=theBody)
else:
theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
return HTTPResponse(status=401, body=theBody)
def toogle():
relay_name = request.forms.get('relayname')
token = request.forms.get('token')
valid_token = verify_auth_token(token)
if valid_token == True:
if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8':
tooglerelay_func(dic_pins[relay_name],"a")
if relay_name == 'relay9' or relay_name == 'relay10':
tooglerelay_func(dic_pins[relay_name],"b")
theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response
return HTTPResponse(status=200, body=theBody)
else:
theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
return HTTPResponse(status=401, body=theBody)
def on_off():
relay_name = request.forms.get('relayname')
token = request.forms.get('token')
state = request.forms.get('state')
valid_token = verify_auth_token(token)
if valid_token == True:
if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8':
if state == "high":
high_rele(dic_pins[relay_name],"a")
elif state == "low":
low_rele(dic_pins[relay_name],"a")
else:
theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
return HTTPResponse(status=412, body=theBody)
if relay_name == 'relay9' or relay_name == 'relay10':
if state == "high":
high_rele(dic_pins[relay_name],"b")
elif state == "low":
low_rele(dic_pins[relay_name],"b")
else:
theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
return HTTPResponse(status=412, body=theBody)
theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response
return HTTPResponse(status=200, body=theBody)
else:
theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
return HTTPResponse(status=401, body=theBody)
def estado_dos_reles():
token = request.forms.get('token')
valid_token = verify_auth_token(token)
if valid_token == True:
theBody = come_back_reles()
return HTTPResponse(status=200, body=theBody)
else:
theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
return HTTPResponse(status=401, body=theBody)
def sensores_staus():
token = request.forms.get('token')
valid_token = verify_auth_token(token)
if valid_token == True:
try:
sensores = return_state_inputs()
#theBody = json.dumps({'200': 'ok'})
return HTTPResponse(status=200, body=sensores)
except:
theBody = json.dumps({'500': 'Error'})
return HTTPResponse(status=200, body=theBody)
else:
theBody = json.dumps({'401': 'Unauthorized'})
return HTTPResponse(status=401, body=theBody)