def on_post(self, req, resp):
# body should contain a geojson Feature
body = req.stream.read().decode('utf-8')
j = json.loads(body)
# pass the query with the geometry to event.on_get
event.on_get(req, resp, None, j['geometry'])
# Falcon.API instances are callable WSGI apps.
python类API的实例源码
def load():
'''Load the sense2vec model, and return a falcon API object that exposes
the SimilarityService.
'''
print("Loading")
cors = CORS(allow_all_origins=True, allow_all_methods=True, allow_all_headers=True,
allow_credentials_all_origins=True)
app = falcon.API(middleware=[cors.middleware])
app.add_route('/{query}', SimilarityService())
print("Loaded!")
return app
def create(middleware=CONF.middleware):
policy.setup_policy()
if middleware:
api = falcon.API(
request_type=ArmadaRequest,
middleware=[
AuthMiddleware(),
LoggingMiddleware(),
ContextMiddleware()
])
else:
api = falcon.API(request_type=ArmadaRequest)
# Configure API routing
url_routes_v1 = (
('health', Health()),
('apply', Apply()),
('releases', Release()),
('status', Status()),
('tests', Tests()),
('test/{release}', Test()),
('validate', Validate()),
)
for route, service in url_routes_v1:
api.add_route("/api/v1.0/{}".format(route), service)
return api
def create_app(image_store):
image_resource = Resource(image_store)
api = falcon.API()
api.add_route('/images', image_resource)
return api
def test_custom_router_add_route_should_be_used():
check = []
class CustomRouter(object):
def add_route(self, uri_template, *args, **kwargs):
check.append(uri_template)
def find(self, uri):
pass
app = falcon.API(router=CustomRouter())
app.add_route('/test', 'resource')
assert len(check) == 1
assert '/test' in check
def test_custom_router_takes_req_positional_argument():
def responder(req, resp):
resp.body = 'OK'
class CustomRouter(object):
def find(self, uri, req):
if uri == '/test' and isinstance(req, falcon.Request):
return responder, {'GET': responder}, {}, None
router = CustomRouter()
app = falcon.API(router=router)
client = testing.TestClient(app)
response = client.simulate_request(path='/test')
assert response.content == b'OK'
def client():
return testing.TestClient(falcon.API())
def test_not_str(uri_template):
app = falcon.API()
with pytest.raises(TypeError):
app.add_route(uri_template, ResourceWithId(-1))
def client():
app = falcon.API()
app.add_route('/', ErroredClassResource())
return testing.TestClient(app)
test_wsgiref_inputwrapper_with_size.py 文件源码
项目:deb-python-falcon
作者: openstack
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_resources_can_read_request_stream_during_tests(self):
"""Make sure we can perform a simple request during testing.
Originally, testing would fail after performing a request because no
size was specified when calling `wsgiref.validate.InputWrapper.read()`
via `req.stream.read()`"""
app = falcon.API()
type_route = '/type'
app.add_route(type_route, TypeResource())
client = testing.TestClient(app)
result = client.simulate_post(path=type_route, body='hello')
assert result.status == falcon.HTTP_200
assert result.json == {'data': 'hello'}
def test_raise_status_in_before_hook(self):
""" Make sure we get the 200 raised by before hook """
app = falcon.API()
app.add_route('/status', TestStatusResource())
client = testing.TestClient(app)
response = client.simulate_request(path='/status', method='GET')
assert response.status == falcon.HTTP_200
assert response.headers['x-failed'] == 'False'
assert response.text == 'Pass'
def test_raise_status_runs_after_hooks(self):
""" Make sure after hooks still run """
app = falcon.API()
app.add_route('/status', TestStatusResource())
client = testing.TestClient(app)
response = client.simulate_request(path='/status', method='PUT')
assert response.status == falcon.HTTP_200
assert response.headers['x-failed'] == 'False'
assert response.text == 'Pass'
def test_raise_status_survives_after_hooks(self):
""" Make sure after hook doesn't overwrite our status """
app = falcon.API()
app.add_route('/status', TestStatusResource())
client = testing.TestClient(app)
response = client.simulate_request(path='/status', method='DELETE')
assert response.status == falcon.HTTP_200
assert response.headers['x-failed'] == 'False'
assert response.text == 'Pass'
def test_raise_status_empty_body(self):
""" Make sure passing None to body results in empty body """
app = falcon.API()
app.add_route('/status', TestStatusResource())
client = testing.TestClient(app)
response = client.simulate_request(path='/status', method='PATCH')
assert response.text == ''
def client():
app = falcon.API()
resource = WrappedRespondersResource()
app.add_route('/', resource)
return testing.TestClient(app)
# --------------------------------------------------------------------
# Hooks
# --------------------------------------------------------------------
def client():
app = falcon.API()
app.req_options.auto_parse_form_urlencoded = True
return testing.TestClient(app)
def test_dont_auto_parse_by_default(self):
app = falcon.API()
resource = testing.SimpleTestResource()
app.add_route('/', resource)
client = testing.TestClient(app)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
client.simulate_request(path='/', body='q=42', headers=headers)
req = resource.captured_req
assert req.get_param('q') is None
def process_request(self, req, resp):
if not req.client_accepts_json:
raise falcon.HTTPNotAcceptable(
'This API only supports responses encoded as JSON.',
href='http://docs.examples.com/api/json')
if req.method in ('POST', 'PUT'):
if 'application/json' not in req.content_type:
raise falcon.HTTPUnsupportedMediaType(
'This API only supports requests encoded as JSON.',
href='http://docs.examples.com/api/json')
def test_decode_empty_result(self):
app = falcon.API()
client = testing.TestClient(app)
response = client.simulate_request(path='/')
assert response.text == ''
def test_status(self):
app = falcon.API()
resource = testing.SimpleTestResource(status=falcon.HTTP_702)
app.add_route('/', resource)
client = testing.TestClient(app)
result = client.simulate_get()
assert result.status == falcon.HTTP_702