def version(request):
# Use the version.json file in the current dir.
with suppress(IOError):
with open(VERSION_FILE) as fd:
return web.json_response(json.load(fd))
return web.HTTPNotFound()
python类HTTPNotFound()的实例源码
def cli(loop, test_client):
async def error403(request):
raise web.HTTPForbidden()
async def error404(request):
return web.HTTPNotFound()
async def error(request):
raise ValueError()
app = get_app(loop=loop)
app.router.add_get('/error', error)
app.router.add_get('/error-403', error403)
app.router.add_get('/error-404', error404)
return loop.run_until_complete(test_client(app))
def ajax(f):
"""
Enforces the use of AJAX requests to access a resource. (Raises HTTPNotFound otherwise)
"""
@wraps(f)
async def wrapped(request):
if not is_ajax(request):
raise HTTPNotFound()
return await f(request)
return wrapped
def browsers(self, request):
version = request.match_info['version']
if version not in self.files:
raise web.HTTPNotFound(
text='No data was found for version {version}'.format(
version=version,
),
)
return web.json_response(body=self.files[version])
def get_file(request: web.Request):
filename = request.match_info.get('name').strip()
filepath = os.path.join(config.args.storage, filename)
_, ext = os.path.splitext(filepath)
etag = hashlib.sha1(filename.encode('utf-8')).hexdigest()
if not os.path.exists(filepath):
raise web.HTTPNotFound()
if 'If-None-Match' in request.headers:
raise web.HTTPNotModified(headers={
'ETag': etag
})
stat = os.stat(filepath)
if request.method == 'HEAD':
resp = web.Response()
else:
resp = web.StreamResponse()
resp.headers['Content-Type'] = mimetypes.types_map.get(ext, 'application/octet-stream')
resp.headers['ETag'] = etag
resp.headers['Cache-Control'] = 'max-age=31536000'
resp.headers['X-Content-SHA1'] = get_hash_from_name(filename)
resp.content_length = stat.st_size
resp.last_modified = stat.st_mtime
if request.method == 'HEAD':
return resp
yield from resp.prepare(request)
with open(filepath, 'rb') as f:
for chunk in chunks(f):
resp.write(chunk)
yield from resp.drain()
yield from resp.write_eof()
resp.force_close()
return resp
def poll(self, request):
question_id = request.match_info['question_id']
try:
question, choices = await db.get_question(self.postgres,
question_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
return {
'question': question,
'choices': choices
}
def results(self, request):
question_id = request.match_info['question_id']
try:
question, choices = await db.get_question(self.postgres,
question_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
return {
'question': question,
'choices': choices
}
def vote(self, request):
question_id = int(request.match_info['question_id'])
data = await request.post()
try:
choice_id = int(data['choice'])
except (KeyError, TypeError, ValueError) as e:
raise web.HTTPBadRequest(
text='You have not specified choice value') from e
try:
await db.vote(self.postgres, question_id, choice_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
router = request.app.router
url = router['results'].url(parts={'question_id': question_id})
return web.HTTPFound(location=url)
def user_timeline(self, request):
username = request.match_info['username']
profile_user = await self.mongo.user.find_one({'username': username})
if profile_user is None:
raise web.HTTPNotFound()
followed = False
session = await get_session(request)
user_id = session.get('user_id')
user = None
if user_id:
user = await self.mongo.user.find_one({'_id': ObjectId(user_id)})
followed = await self.mongo.follower.find_one(
{'who_id': ObjectId(session['user_id']),
'whom_id': {'$in': [ObjectId(profile_user['_id'])]}})
followed = followed is not None
messages = await (self.mongo.message
.find({'author_id': ObjectId(profile_user['_id'])})
.sort('pub_date', -1)
.to_list(30))
profile_user['_id'] = str(profile_user['_id'])
return {"messages": messages,
"followed": followed,
"profile_user": profile_user,
"user": user,
"endpoint": request.match_info.route.name}
def get_pet(request):
pet_id = request.match_info['petId']
if pet_id == '5':
return web.HTTPNotFound()
return web.json_response({
'id': int(pet_id),
'name': 'Lili',
'photoUrls': [],
})
def update_pet_formdata(request):
post_data = await request.post()
if not (
request.match_info['petId'] == '12'
and post_data.get('name') == 'Vivi'
and post_data.get('status') == 'sold'
and request.headers.get('userId') == '42'
):
return web.HTTPNotFound()
return web.json_response({})
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any:
if '?P<filename>' not in base_url:
pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
else:
pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
compiled_pattern = re.compile(pattern)
if path.startswith('/'):
path = os.path.dirname(path)
else:
path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path)
if not path.endswith('/'):
path = '{}/'.format(path)
async def handler(request: web.Request) -> web.Response:
result = compiled_pattern.match(request.path)
filename = result.groupdict()['filename']
filepath = '{}{}'.format(path, filename)
try:
if os.path.isdir(filepath) or not os.path.exists(filepath):
raise web.HTTPNotFound()
pathlib.Path(filepath).open('r')
return FileResponse(filepath)
except PermissionError as e:
raise web.HTTPForbidden()
context['_http_routes'] = context.get('_http_routes', [])
context['_http_routes'].append(('GET', pattern, handler))
start_func = cls.start_server(obj, context)
return (await start_func) if start_func else None
def repo(request):
prefix = request.match_info['prefix']
try:
repo_name = getattr(scopes, prefix).repo
except AttributeError:
return web.HTTPNotFound()
def info(name, pr=False):
ref = '%s/%s' % ('pull' if pr else 'heads', name)
if pr:
lxc = '%spr-%s' % (prefix, name)
gh_url = 'https://github.com/%s/pull/%s' % (repo_name, name)
else:
name_cleaned = re.sub('[^a-z0-9]', '', name.lower())
lxc = '%s-%s' % (prefix, name_cleaned)
gh_url = 'https://github.com/%s/commits/%s' % (repo_name, ref)
return {
'protected_db': lxc in conf['protected_dbs'],
'name': name,
'lxc': lxc,
'gh_url': gh_url,
'url': 'http://%s.%s' % (lxc, conf['domain']),
'restart_url': get_restart_url(prefix, ref),
'logs_url': '%slatest/%s/' % (conf['log_url'], lxc),
}
resp, body = await gh_api('repos/%s/pulls?per_page=100' % repo_name)
pulls = [info(i['number'], True) for i in body]
resp, body = await gh_api('repos/%s/branches?per_page=100' % repo_name)
branches = [info(i['name']) for i in body]
refs = [
{'title': 'Pull requests', 'items': pulls},
{'title': 'Branches', 'items': branches}
]
return render_tpl(repo_tpl, {'refs': refs})
def simple(request):
try:
letter = request.url.query['q']
except KeyError:
raise web.HTTPBadRequest()
try:
zenline = zenlines[letter]
except KeyError:
raise web.HTTPNotFound()
return web.Response(text=zenline)
def thumbs(request, crop=True):
"""
Return an image/jpeg image that's a thumbnail of the encoded request.
"""
encoded = request.match_info['encoded']
cached_thumb = os.path.join(args.cache, encoded)
if not os.path.isfile(cached_thumb):
try:
__, w_x_h, path = decode(CIPHER_KEY, encoded).split(':', 3)
except (binascii.Error, UnicodeDecodeError, ValueError):
return web.HTTPNotFound()
# WISHLIST add as extra context to the aiohttp.access logger
logger.info('Decoded as %s %s', path, w_x_h)
abspath = args.STORAGE_DIR + path
try:
im = Image.open(abspath)
except (FileNotFoundError, IsADirectoryError):
return web.HTTPNotFound()
thumb_dimension = [int(x) for x in w_x_h.split('x')]
with open(cached_thumb, 'wb') as fh:
if crop:
cropped_im = crop_1(im)
cropped_im.thumbnail(thumb_dimension)
cropped_im.save(fh, 'jpeg')
else:
im.thumbnail(thumb_dimension)
im.save(fh, 'jpeg')
with open(cached_thumb, 'rb') as fh:
return web.Response(
status=200, body=fh.read(), content_type='image/jpeg',
headers={
'Cache-Control': 'max-age=86400',
})
def call(self, request):
es_query = {
'bool': {
'filter': [
{'match_all': {}}
if self.session.company == '__all__' else
{'term': {'company': self.session.company}},
] + [
{'term': {'_id': request.match_info['id']}}
]
}
}
method = request.match_info['method']
r = await self.app['es'].get(
f'messages/{method}/_search?filter_path=hits', query=es_query
)
data = await r.json()
if data['hits']['total'] != 1:
raise HTTPNotFound(text='message not found')
source = data['hits']['hits'][0]['_source']
body = source['body']
if method.startswith('sms'):
# need to render the sms so it makes sense to users
return {
'from': source['from_name'],
'to': source['to_last_name'] or source['to_address'],
'status': source['status'],
'message': body,
'extra': source.get('extra') or {},
}
else:
return {'raw': body}
def get_flag(base_url, cc): # <2>
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
def get_flag(base_url, cc): # <2>
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(10):
async with session.get(url) as resp:
if resp.status == 200:
image = await resp.read() # <5>
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
def get(self):
id = self.match['id']
if id.isdigit() is False:
raise web.HTTPNotFound()
data = await self.redis.get('Article', id)
if data is None:
raise web.HTTPNotFound()
# ????
data['created_date'] = todate(data['created_time'], '%Y-%m-%d %H:%M:%S')
data['updated_date'] = todate(data['updated_time'], '%Y-%m-%d %H:%M:%S')
# ??
try:
data['citations'] = [render(item)[3:-5] for item in data.get('citation').split('|')]
except AttributeError:
data['citations'] = []
data['tags'] = [item for item in data.get('tag').split('|')]
if len(re.findall('[$]{1,2}', data['text'])) > 0:
math = True
else:
math = False
return geass({
'article': data,
'math': math,
'PAGE_IDENTIFIER': self.request.app.router['article'].url(
parts={'id': id}
),
'dev': not config.dev,
'comment': True
}, self.request, 'public/article.html')
def set_configuration_item(request):
"""
Args:
request: category_name, config_item, {"value" : <some value>} are required
Returns:
set the configuration item value in the given category.
:Example:
curl -X PUT -H "Content-Type: application/json" -d '{"value": <some value> }' http://localhost:8081/foglamp/category/{category_name}/{config_item}
For {category_name}=>PURGE update value for {config_item}=>age
curl -X PUT -H "Content-Type: application/json" -d '{"value": 24}' http://localhost:8081/foglamp/category/PURGE/age
"""
category_name = request.match_info.get('category_name', None)
config_item = request.match_info.get('config_item', None)
data = await request.json()
# TODO: make it optimized and elegant
cf_mgr = ConfigurationManager(connect.get_storage())
try:
value = data['value']
await cf_mgr.set_category_item_value_entry(category_name, config_item, value)
result = await cf_mgr.get_category_item(category_name, config_item)
if result is None:
raise web.HTTPNotFound(reason="No detail found for the category_name: {} and config_item: {}".format(category_name, config_item))
except KeyError:
raise web.HTTPBadRequest(reason='Missing required value for {}'.format(config_item))
return web.json_response(result)