def make_request(method, path, content=None, headers=None):
s = streams.StreamReader()
if content is not None:
s.feed_data(content)
s.feed_eof()
return web.Request(Message(method, path, headers or {}), s, Protocol, None, None, None)
python类Request()的实例源码
def _authenticate_user(self, request : Request):
"""
If the area features membership, it invokes the methods of the underlying membership provider to authenticate
the user, supporting anonymous authentication.
:param request: request to authenticate.
"""
request.user = None
encryption_key = self.config.encryption_key
membership = self.membership
set_anonymous_session = False
if self.membership:
# does the request contains the session cookie for this area?
session_cookie_name = self.config.session_cookie_name
session_key = request.cookies.get(session_cookie_name)
if session_key:
# try to load the session
# decrypt the session key
success, session_guid = AesEncryptor.try_decrypt(session_key, encryption_key)
if success:
# try to perform login by session key
success, result = await membership.try_login_by_session_key(session_guid)
if success:
# result is a principal object
request.user = result.principal
request.session = result.session
else:
# the login by session cookie failed: the session could be expired
set_anonymous_session = True
else:
# session key decryption failed
set_anonymous_session = True
else:
# the request does not contain a session cookie for this area
set_anonymous_session = True
if set_anonymous_session:
# initialize an anonymous session
await self.initialize_anonymous_session(request)
return self
def _get_culture_for_request(self, request):
"""
Gets the culture to use for a given request.
"""
if "GET" == request.method:
culture = request.match_info.get("culture")
if culture:
if not self._is_supported_culture(culture):
# the given culture is not supported; the user could have changed a url by hand
# raise an exception to redirect to a proper url
raise InvalidCultureException()
return culture
user = request.user
if user and not user.anonymous and self._is_supported_culture(user.culture):
return user.culture
if "POST" == request.method:
# check custom headers
culture_header = request.headers.get("X-Request-Culture")
if self._is_supported_culture(culture_header):
return culture_header
culture_cookie = request.cookies.get("culture")
if self._is_supported_culture(culture_cookie):
return culture_cookie
def handler(create_client: Callable[[], aiohttp.ClientSession], server: ni_abc.ServerHost,
cla_records: ni_abc.CLAHost) -> Callable[[web.Request], Awaitable[web.Response]]:
"""Create a closure to handle requests from the contribution host."""
async def respond(request: web.Request) -> web.Response:
"""Handle a webhook trigger from the contribution host."""
async with create_client() as client:
try:
contribution = await ContribHost.process(server, request, client)
usernames = await contribution.usernames()
server.log("Usernames: " + str(usernames))
trusted_users = server.trusted_users()
usernames_to_check = usernames - trusted_users
cla_status = await cla_records.check(client, usernames_to_check)
server.log("CLA status: " + str(cla_status))
# With a work queue, one could make the updating of the
# contribution a work item and return an HTTP 202 response.
await contribution.update(cla_status)
return web.Response(status=http.HTTPStatus.OK)
except ni_abc.ResponseExit as exc:
return exc.response
except Exception as exc:
server.log_exception(exc)
return web.Response(
status=http.HTTPStatus.INTERNAL_SERVER_ERROR)
return respond
def process(cls, server: ni_abc.ServerHost,
request: web.Request, client: aiohttp.ClientSession) -> "Host":
"""Process the pull request."""
event = sansio.Event.from_http(request.headers,
await request.read(),
secret=server.contrib_secret())
if event.event == "ping":
# A ping event; nothing to do.
# https://developer.github.com/webhooks/#ping-event
raise ni_abc.ResponseExit(status=http.HTTPStatus.OK)
elif event.event != "pull_request":
# Only happens if GitHub is misconfigured to send the wrong events.
raise TypeError(f"don't know how to handle a {event.event!r} event")
elif event.data['action'] not in cls._useful_actions:
raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
elif event.data['action'] in {PullRequestEvent.opened.value, PullRequestEvent.synchronize.value}:
if event.data['action'] == PullRequestEvent.opened.value:
# GitHub is eventually consistent, so add a delay to wait for
# the API to digest the new pull request.
await asyncio.sleep(1)
return cls(server, client, PullRequestEvent(event.data['action']),
event.data)
elif event.data['action'] == PullRequestEvent.unlabeled.value:
label = event.data['label']['name']
if not label.startswith(LABEL_PREFIX):
raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
return cls(server, client, PullRequestEvent.unlabeled, event.data)
else: # pragma: no cover
# Should never happen.
raise TypeError(f"don't know how to handle a {event.data['action']!r} action")
def process(cls, server: ServerHost,
request: web.Request,
client: aiohttp.ClientSession) -> "ContribHost":
"""Process a request into a contribution."""
# This method exists because __init__() cannot be a coroutine.
raise ResponseExit(status=http.HTTPStatus.NOT_IMPLEMENTED) # pragma: no cover
def get_file(request: web.Request):
filename = request.match_info.get('name').strip()
filepath = os.path.join(config.args.storage, filename)
_, ext = os.path.splitext(filepath)
etag = hashlib.sha1(filename.encode('utf-8')).hexdigest()
if not os.path.exists(filepath):
raise web.HTTPNotFound()
if 'If-None-Match' in request.headers:
raise web.HTTPNotModified(headers={
'ETag': etag
})
stat = os.stat(filepath)
if request.method == 'HEAD':
resp = web.Response()
else:
resp = web.StreamResponse()
resp.headers['Content-Type'] = mimetypes.types_map.get(ext, 'application/octet-stream')
resp.headers['ETag'] = etag
resp.headers['Cache-Control'] = 'max-age=31536000'
resp.headers['X-Content-SHA1'] = get_hash_from_name(filename)
resp.content_length = stat.st_size
resp.last_modified = stat.st_mtime
if request.method == 'HEAD':
return resp
yield from resp.prepare(request)
with open(filepath, 'rb') as f:
for chunk in chunks(f):
resp.write(chunk)
yield from resp.drain()
yield from resp.write_eof()
resp.force_close()
return resp
def status(request: web.Request):
stat = {
'queues': {
n: {
'length': len(replication.get_queue_for_node(n))
} for n in replication.get_nodes()
}
}
return web.Response(text=json.dumps(stat), headers={
'Content-Type': 'application/json'
})
def render_html(request: web.Request):
# https://splash.readthedocs.io/en/stable/api.html#render-html
tab = await _go(request)
return web.Response(text=BS((await tab.html()).decode()).prettify())
def example(self, request: web.Request) -> str:
await asyncio.sleep(1)
return '??' # tomodachi
def example_with_id(self, request: web.Request, id: str) -> str:
return '?? (id: {})'.format(id)
def response_object(self, request: web.Request) -> Response:
return Response(body='{"data": true}', status=200, content_type='application/json')
def error_404(self, request: web.Request) -> str:
return 'error 404'
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any:
if '?P<filename>' not in base_url:
pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
else:
pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
compiled_pattern = re.compile(pattern)
if path.startswith('/'):
path = os.path.dirname(path)
else:
path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path)
if not path.endswith('/'):
path = '{}/'.format(path)
async def handler(request: web.Request) -> web.Response:
result = compiled_pattern.match(request.path)
filename = result.groupdict()['filename']
filepath = '{}{}'.format(path, filename)
try:
if os.path.isdir(filepath) or not os.path.exists(filepath):
raise web.HTTPNotFound()
pathlib.Path(filepath).open('r')
return FileResponse(filepath)
except PermissionError as e:
raise web.HTTPForbidden()
context['_http_routes'] = context.get('_http_routes', [])
context['_http_routes'].append(('GET', pattern, handler))
start_func = cls.start_server(obj, context)
return (await start_func) if start_func else None
def test(self, request: web.Request) -> str:
return_value = 'test'
return return_value
def test_with_id(self, request: web.Request, id: str) -> str:
return 'test {}'.format(id)
def test_slow(self, request: web.Request) -> str:
await asyncio.sleep(2.0)
self.slow_request = True
return 'test'
def test_dict(self, request: web.Request) -> Dict:
return {
'status': 200,
'body': 'test dict',
'headers': {
'X-Dict': 'test'
}
}
def test_tuple(self, request: web.Request) -> Tuple:
return (200, 'test tuple', {
'X-Tuple': 'test'
})
def test_response_object(self, request: web.Request) -> Response:
return Response(body='test tomodachi response', status=200, headers={
'X-Tomodachi-Response': 'test'
})