def test_middleware_response():
app = Sanic('test_middleware_response')
results = []
@app.middleware('request')
async def process_response(request):
results.append(request)
@app.middleware('response')
async def process_response(request, response):
results.append(request)
results.append(response)
@app.route('/')
async def handler(request):
return text('OK')
request, response = sanic_endpoint_test(app)
assert response.text == 'OK'
assert type(results[0]) is Request
assert type(results[1]) is Request
assert issubclass(type(results[2]), HTTPResponse)
python类Sanic()的实例源码
def test_dynamic_route_int():
app = Sanic('test_dynamic_route_int')
results = []
@app.route('/folder/<folder_id:int>')
async def handler(request, folder_id):
results.append(folder_id)
return text('OK')
request, response = sanic_endpoint_test(app, uri='/folder/12345')
assert response.text == 'OK'
assert type(results[0]) is int
request, response = sanic_endpoint_test(app, uri='/folder/asdf')
assert response.status == 404
def test_dynamic_route_number():
app = Sanic('test_dynamic_route_int')
results = []
@app.route('/weight/<weight:number>')
async def handler(request, weight):
results.append(weight)
return text('OK')
request, response = sanic_endpoint_test(app, uri='/weight/12345')
assert response.text == 'OK'
assert type(results[0]) is float
request, response = sanic_endpoint_test(app, uri='/weight/1234.56')
assert response.status == 200
request, response = sanic_endpoint_test(app, uri='/weight/1234-56')
assert response.status == 404
def test_several_bp_with_url_prefix():
app = Sanic('test_text')
bp = Blueprint('test_text', url_prefix='/test1')
bp2 = Blueprint('test_text2', url_prefix='/test2')
@bp.route('/')
def handler(request):
return text('Hello')
@bp2.route('/')
def handler2(request):
return text('Hello2')
app.register_blueprint(bp)
app.register_blueprint(bp2)
request, response = sanic_endpoint_test(app, uri='/test1/')
assert response.text == 'Hello'
request, response = sanic_endpoint_test(app, uri='/test2/')
assert response.text == 'Hello2'
def test_bp_middleware():
app = Sanic('test_middleware')
blueprint = Blueprint('test_middleware')
@blueprint.middleware('response')
async def process_response(request, response):
return text('OK')
@app.route('/')
async def handler(request):
return text('FAIL')
app.register_blueprint(blueprint)
request, response = sanic_endpoint_test(app)
assert response.status == 200
assert response.text == 'OK'
def create_app(path='/graphql', **kwargs):
app = Sanic(__name__)
app.debug = True
schema = kwargs.pop('schema', None) or Schema
async_executor = kwargs.pop('async_executor', False)
if async_executor:
@app.listener('before_server_start')
def init_async_executor(app, loop):
executor = AsyncioExecutor(loop)
app.add_route(GraphQLView.as_view(schema=schema, executor=executor, **kwargs), path)
@app.listener('before_server_stop')
def remove_graphql_endpoint(app, loop):
app.remove_route(path)
else:
app.add_route(GraphQLView.as_view(schema=schema, **kwargs), path)
app.client = SanicTestClient(app)
return app
def _setup_sanic(self):
from sanic import Sanic
from sanic import response as sanic_response
from sanic.response import redirect as sanic_redirect
singletons.register('sanic_app', lambda: Sanic('omnic'))
self.app = singletons.sanic_app
self.response = sanic_response
self.redirect = sanic_redirect
# Set up all routes for all services
for service in singletons.settings.load_all('SERVICES'):
service_name = service.SERVICE_NAME
for partial_url, view in service.urls.items():
if isinstance(view, str):
view = getattr(service, view)
url = '%s/%s' % (service_name, partial_url)
self.app.add_route(view, url)
def __init__(self, ethpconnect='127.0.0.1', ethpport=9500,
rpcconnect='127.0.0.1', rpcport=8545,
ipcconnect=None, blocknotify=None, walletnotify=None,
alertnotify=None, tls=False, *, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._app = Sanic(__name__,
log_config=None,
error_handler=SentryErrorHandler())
self._host = ethpconnect
self._port = ethpport
self._rpc_host = rpcconnect
self._rpc_port = rpcport
self._unix_socket = ipcconnect
self._blocknotify = blocknotify
self._walletnotify = walletnotify
self._alertnotify = alertnotify
self._tls = tls
self._log = logging.getLogger('rpc_server')
self.routes()
def test_content_type():
app = Sanic('test_content_type')
@app.route('/')
async def handler(request):
return text(request.content_type)
request, response = app.test_client.get('/')
assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
assert response.text == DEFAULT_HTTP_CONTENT_TYPE
headers = {
'content-type': 'application/json',
}
request, response = app.test_client.get('/', headers=headers)
assert request.content_type == 'application/json'
assert response.text == 'application/json'
def test_remote_addr():
app = Sanic('test_content_type')
@app.route('/')
async def handler(request):
return text(request.remote_addr)
headers = {
'X-Forwarded-For': '127.0.0.1, 127.0.1.2'
}
request, response = app.test_client.get('/', headers=headers)
assert request.remote_addr == '127.0.0.1'
assert response.text == '127.0.0.1'
request, response = app.test_client.get('/')
assert request.remote_addr == ''
assert response.text == ''
headers = {
'X-Forwarded-For': '127.0.0.1, , ,,127.0.1.2'
}
request, response = app.test_client.get('/', headers=headers)
assert request.remote_addr == '127.0.0.1'
assert response.text == '127.0.0.1'
def test_match_info():
app = Sanic('test_match_info')
@app.route('/api/v1/user/<user_id>/')
async def handler(request, user_id):
return json(request.match_info)
request, response = app.test_client.get('/api/v1/user/sanic_user/')
assert request.match_info == {"user_id": "sanic_user"}
assert json_loads(response.text) == {"user_id": "sanic_user"}
# ------------------------------------------------------------ #
# POST
# ------------------------------------------------------------ #
def test_url_attributes_no_ssl(path, query, expected_url):
app = Sanic('test_url_attrs_no_ssl')
async def handler(request):
return text('OK')
app.add_route(handler, path)
request, response = app.test_client.get(path + '?{}'.format(query))
assert request.url == expected_url.format(HOST, PORT)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_url_attributes_with_ssl(path, query, expected_url):
app = Sanic('test_url_attrs_with_ssl')
current_dir = os.path.dirname(os.path.realpath(__file__))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
os.path.join(current_dir, 'certs/selfsigned.cert'),
keyfile=os.path.join(current_dir, 'certs/selfsigned.key'))
async def handler(request):
return text('OK')
app.add_route(handler, path)
request, response = app.test_client.get(
'https://{}:{}'.format(HOST, PORT) + path + '?{}'.format(query),
server_kwargs={'ssl': context})
assert request.url == expected_url.format(HOST, PORT)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_multiprocessing():
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
app = Sanic('test_multiprocessing')
process_list = set()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
app.run(HOST, PORT, workers=num_workers)
assert len(process_list) == num_workers
def test_static_content_range_correct(file_name, static_file_directory):
app = Sanic('test_static')
app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = {
'Range': 'bytes=12-19'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[12:19]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
def test_static_content_range_front(file_name, static_file_directory):
app = Sanic('test_static')
app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = {
'Range': 'bytes=12-'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[12:]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
def test_static_content_range_back(file_name, static_file_directory):
app = Sanic('test_static')
app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = {
'Range': 'bytes=-12'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[-12:]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
def test_method_not_allowed():
app = Sanic('method_not_allowed')
@app.get('/')
async def test(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.headers['Allow']== 'GET'
@app.post('/')
async def test(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
assert response.headers['Content-Length'] == '0'
def test_file_head_response(file_name, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET', 'HEAD'])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
stats = await async_os.stat(file_path)
headers = dict()
headers['Accept-Ranges'] = 'bytes'
headers['Content-Length'] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or 'text/plain')
else:
return file(file_path, headers=headers,
mime_type=guess_type(file_path)[0] or 'text/plain')
request, response = app.test_client.head('/files/{}'.format(file_name))
assert response.status == 200
assert 'Accept-Ranges' in response.headers
assert 'Content-Length' in response.headers
assert int(response.headers[
'Content-Length']) == len(
get_file_content(static_file_directory, file_name))
def test_unauthorized_exception(exception_app):
"""Test the built-in Unauthorized exception"""
request, response = exception_app.test_client.get('/401')
assert response.status == 401
request, response = exception_app.test_client.get('/401/basic')
assert response.status == 401
assert response.headers.get('WWW-Authenticate') is not None
assert response.headers.get('WWW-Authenticate') == "Basic realm='Sanic'"
request, response = exception_app.test_client.get('/401/digest')
assert response.status == 401
auth_header = response.headers.get('WWW-Authenticate')
assert auth_header is not None
assert auth_header.startswith('Digest')
assert "qop='auth, auth-int'" in auth_header
assert "algorithm='MD5'" in auth_header
assert "nonce='abcdef'" in auth_header
assert "opaque='zyxwvu'" in auth_header
request, response = exception_app.test_client.get('/401/bearer')
assert response.status == 401
assert response.headers.get('WWW-Authenticate') == "Bearer"