def exception_control(func):
''' ???????
'''
@functools.wraps(func)
def wrapper(self):
''' ????
'''
try:
code, msg, body = E_SUCC, "OK", func(self)
except (MissingArgumentError, AssertionError) as ex:
code, msg, body = E_PARAM, str(ex), None
except tornado.web.HTTPError:
raise
except Exception as ex:
code, msg, body = E_INTER, str(ex), None
log_msg = self.request.uri \
if self.request.files else \
"%s %s" % (self.request.uri, self.request.body)
logging.error(log_msg, exc_info=True)
self.send_json(body, code, msg)
return wrapper
python类HTTPError()的实例源码
def process_module(self, module):
'''
??????
'''
module = module or ''
if self.module_prefix:
module = '%s/%s' % (self.module_prefix, module)
module = '__'.join([i for i in module.split('/') if i])
method = getattr(self, module or 'index', None)
if method and module not in ('get', 'post'):
try:
result = method()
if result:
self.send_json(result)
except Exception as ex:
logging.error('%s\n%s\n', self.request, str(ex), exc_info=True)
self.send_json(None, E_INTER, str(ex))
else:
raise tornado.web.HTTPError(404)
def post(self):
# ???? TODO: ?????
mobile = self.get_argument('mobile')
pwd = self.get_argument('pwd')
users = yield User.objects.filter(mobile=mobile).find_all()
if len(users) != 0:
# ?????????
raise HTTPError(**errors.status_21)
school_id = self.get_argument('school_id')
self.vaildate_id(school_id)
school = yield School.objects.get(self.get_argument('school_id'))
self.check_none(school)
# ????
user = User(mobile=mobile, password=pwd, nickname='test', school_id=ObjectId(school_id))
yield user.save()
user = user.to_dict()
user['token'] = token_manager.create_token(str(user['id']))
self.write_json(user)
def put(self, section_name):
if not self.settings_dir:
raise web.HTTPError(404, "No current settings directory")
raw = self.request.body.strip().decode(u"utf-8")
# Validate the data against the schema.
schema = _get_schema(self.schemas_dir, section_name, self.overrides)
validator = Validator(schema)
try:
validator.validate(json.loads(json_minify(raw)))
except ValidationError as e:
raise web.HTTPError(400, str(e))
# Write the raw data (comments included) to a file.
path = _path(self.settings_dir, section_name, _file_extension, True)
with open(path, "w") as fid:
fid.write(raw)
self.set_status(204)
def _path(root_dir, section_name, file_extension = ".json", make_dirs = False):
"""Parse the URL section name and find the local file system path."""
parent_dir = root_dir
# Attempt to parse path, e.g. @jupyterlab/apputils-extension:themes.
try:
package_dir, plugin = section_name.split(":")
parent_dir = os.path.join(root_dir, package_dir)
path = os.path.join(parent_dir, plugin + file_extension)
# This is deprecated and exists to support the older URL scheme.
except:
path = os.path.join(root_dir, section_name + file_extension)
if make_dirs and not os.path.exists(parent_dir):
try:
os.makedirs(parent_dir)
except Exception as e:
name = section_name
message = "Failed writing settings ({}): {}".format(name, str(e))
raise web.HTTPError(500, message)
return path
def get(self, provider_prefix, spec):
try:
provider = self.get_provider(provider_prefix, spec=spec)
except web.HTTPError:
raise
except Exception as e:
app_log.error("Failed to construct provider for %s/%s")
# FIXME: 400 assumes it's the user's fault (?)
# maybe we should catch a special InvalidSpecError here
raise web.HTTPError(400, str(e))
self.render_template(
"index.html",
base_url=self.settings['base_url'],
url=provider.get_repo_url(),
ref=provider.unresolved_ref,
filepath=self.get_argument('filepath', None),
urlpath=self.get_argument('urlpath', None),
submit=True,
google_analytics_code=self.settings['google_analytics_code'],
)
def put(self, bucket, object_name):
object_name = urllib.unquote(object_name)
bucket_dir = os.path.abspath(os.path.join(
self.application.directory, bucket))
if not bucket_dir.startswith(self.application.directory) or \
not os.path.isdir(bucket_dir):
raise web.HTTPError(404)
path = self._object_path(bucket, object_name)
if not path.startswith(bucket_dir) or os.path.isdir(path):
raise web.HTTPError(403)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
object_file = open(path, "w")
object_file.write(self.request.body)
object_file.close()
self.finish()
def post(self):
# url and filename are sent in a JSON encoded blob
post_data = tornado.escape.json_decode(self.request.body)
nb_url = post_data['nb_url']
nb_name = base64.b64decode(post_data['nb_name']).decode('utf-8')
force_download = post_data['force_download']
file_path = os.path.join(os.getcwd(), nb_name)
if os.path.isfile(file_path):
if not force_download:
raise HTTPError(409, "ERROR: File already exists.")
response = request_session.get(nb_url, stream=True)
with open(file_path, 'wb') as fd:
# TODO: check if this is a good chunk size
for chunk in response.iter_content(1024):
fd.write(chunk)
self.write(nb_name)
self.flush()
def get(self):
keywords = self.get_argument('keywords', None)
if keywords:
article_ids = KeywordArticle.query_by_keyword(keywords)
if article_ids:
articles = Article.get_by_ids(article_ids, public_only=True)
article_ids = [article.id for article in articles]
hit_counts = ArticleHitCount.get_by_ids(article_ids)
replies_dict = ArticleComments.get_comment_count_of_articles(article_ids)
else:
articles = []
hit_counts = replies_dict = {}
self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True)
self.render('web/search.html', {
'title': u'???%s?' % keywords,
'page': 'search',
'keywords': keywords,
'articles': articles,
'hit_counts': hit_counts,
'replies_dict': replies_dict
})
else:
raise HTTPError(400)
def get(self, tag_name):
if not Tag.exists(tag_name):
raise HTTPError(404)
articles, next_cursor = TagArticle.get_articles(tag_name, self.cursor)
if articles:
article_ids = [article.id for article in articles]
hit_counts = ArticleHitCount.get_by_ids(article_ids)
replies_dict = ArticleComments.get_comment_count_of_articles(article_ids)
else:
hit_counts = replies_dict = {}
next_cursor = None
self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True)
self.render('web/tag_articles.html', {
'title': u'???%s?' % tag_name,
'page': 'tag_articles',
'next_cursor': next_cursor,
'tag_name': tag_name,
'articles': articles,
'hit_counts': hit_counts,
'replies_dict': replies_dict
})
def get(self, category_name):
if not Category.exists(category_name):
raise HTTPError(404)
articles, next_cursor = CategoryArticles.get_articles(category_name, self.cursor)
if articles:
article_ids = [article.id for article in articles]
hit_counts = ArticleHitCount.get_by_ids(article_ids)
replies_dict = ArticleComments.get_comment_count_of_articles(article_ids)
else:
hit_counts = replies_dict = {}
next_cursor = None
self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True)
self.render('web/category_articles.html', {
'title': u'???%s?' % category_name,
'page': 'category_articles',
'next_cursor': next_cursor,
'category_name': category_name,
'articles': articles,
'hit_counts': hit_counts,
'replies_dict': replies_dict
})
def prepare(self):
super(AdminHandler, self).prepare()
self.set_cache(is_public=False)
if CONFIG.ENABLE_HTTPS and not self.is_https:
request = self.request
if request.version == 'HTTP/1.0':
if request.method in ('GET', 'HEAD'):
self.redirect('https://%s%s' % (request.host, request.uri))
else:
raise HTTPError(403)
else:
self.redirect('https://%s%s' % (request.host, request.uri), status=307)
return
if not self.is_admin:
if not self.current_user_id:
request = self.request
if request.method in ('GET', 'HEAD'):
state = Auth.generate(request.uri)
self.set_state_cookie(state)
self.redirect(self.get_login_url(), status=302 if request.version == 'HTTP/1.0' else 303)
return
self.set_session_time_cookie() # force check user status
raise HTTPError(403)
def authorized(admin_only=False):
def wrap(user_handler):
@wraps(user_handler)
def authorized_handler(self, *args, **kwargs):
self.set_cache(is_public=False)
request = self.request
if request.method == 'GET':
if not self.current_user_id:
state = Auth.generate(request.uri)
self.set_state_cookie(state)
self.redirect(self.get_login_url(), status=302 if request.version == 'HTTP/1.0' else 303)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
elif not self.current_user_id:
self.set_session_time_cookie() # force check user status
raise HTTPError(403)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
return authorized_handler
return wrap
def get(self, article_id):
article_id = int(article_id)
if not article_id:
raise HTTPError(404)
article = Article.get_by_id(article_id)
if not article:
raise HTTPError(404)
categories = Category.get_all_names_with_paths()
tags = Tag.get_all()
self.render('admin/edit_article.html', {
'title': u'???%s?' % article.title,
'page': 'edit_article',
'article': article,
'categories': categories,
'tags': sorted(tags)
})
def session(self):
session_id = self.get_secure_cookie('session_id')
hmac_key = self.get_secure_cookie('verification')
if session_id and hmac_key:
check_hmac = self._generate_hmac(session_id)
if hmac_key != check_hmac:
raise HTTPError(403)
else:
session_id = self._generate_id()
hmac_key = self._generate_hmac(session_id)
self.set_secure_cookie('session_id', session_id)
self.set_secure_cookie('verification', hmac_key)
return session_id
def get(self, operation):
if not operation:
operation = 'get'
operations = {'get': self.get_object, 'list': self.list_objects}
if operation not in operations.keys():
raise HTTPError(404)
method = operation.get(operation)
ret = yield method()
self.write(ret)
def put(self, operation):
if not operation:
operation = 'putpost'
operations = {'putpost': self.put_post,
'putcomment': self.put_comment}
if operation not in operations:
raise HTTPError(404)
method = operations.get(operation)
ret = yield method()
self.write(ret)
def get(self, operation):
if not operation:
operation = 'get'
operations = {'get': self.get_object, 'list': self.list_objects}
if operation not in operations:
raise HTTPError(404)
method = operations.get(operation)
ret = yield method()
self.write(ret)
def error_status(exception):
if isinstance(exception, web.HTTPError):
return exception.status_code
elif isinstance(exception, (ExecutionError, GraphQLError)):
return 400
else:
return 500
def error_format(exception):
if isinstance(exception, ExecutionError):
return [{'message': e} for e in exception.errors]
elif isinstance(exception, GraphQLError):
return [format_graphql_error(exception)]
elif isinstance(exception, web.HTTPError):
return [{'message': exception.log_message,
'reason': exception.reason}]
else:
return [{'message': 'Unknown server error'}]