def POST(self, **params):
""" Execute command on remote host(s). """
targets = cherrypy.request.json.pop('targets', None)
if not targets or not isinstance(targets, list):
raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET)
cmd = cherrypy.request.json.pop('cmd', None)
if not cmd or not isinstance(cmd, six.text_type):
raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_ROLE)
jid = self.handle(targets, cmd, async=True)
return response(status.CREATED, dict(jid=jid))
python类HTTPError()的实例源码
def _state_parse(state):
restart = False
if state == STATE_STARTED:
start = True
elif state == STATE_STOPED:
start = False
elif state == STATE_RESTARTED:
start = True
restart = True
else:
raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_BAD_SERVPARAMS)
return start, restart
def params_check_target():
""" Check the request params contains 'target' or not. """
if not cherrypy.request.params.has_key('target'):
raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET)
def parse_params_target(params):
""" Get the `target` from request params or raise http 400 error. """
try:
return params.pop('target')
except KeyError:
raise cherrypy.HTTPError(status.BAD_REQUEST, ERR_NO_TARGET)
def index(self):
if "content-length" in cherrypy.request.headers and \
"content-type" in cherrypy.request.headers and \
cherrypy.request.headers["content-type"] == "application/json":
length = int(cherrypy.request.headers["content-length"])
json_string = cherrypy.request.body.read(length).decode("utf-8")
update = telebot.types.Update.de_json(json_string)
bot.process_new_updates([update])
return ''
else:
raise cherrypy.HTTPError(403)
def login(self, username=None, password=None):
if username is None or password is None:
raise cherrypy.HTTPError(400, 'Bad Request')
try:
cherrypy.session['user'] = User(username)
cherrypy.session['auth'] = cherrypy.session['user'].authenticate(password)
return {'ok': cherrypy.session['user'].auth}
except (InvalidUser, InvalidCredentials):
return {
'ok': False,
'error': 'Invalid credentials. Try again.'
}
except UserModelException:
return {'ok': False}
def forgot(self, emailaddr, email_type):
if '@' in emailaddr:
localpart, domain = emailaddr.split('@')
else:
localpart = emailaddr
domain = cherrypy.request.app.config.get('email', {}).get('domain', 'localhost')
cherrypy.request.email_address = '@'.join([localpart, domain])
if email_type not in ['password', 'username']:
raise cherrypy.HTTPError(400)
cherrypy.request.email_type = email_type
return {'ok': True}
def sshkey(self, sshpubkey=None):
http_method = cherrypy.request.method.upper()
if cherrypy.session.get('auth', False):
user = cherrypy.session['user']
if http_method == 'POST':
try:
newkey = SSHKey(sshpubkey)
user.add_key(newkey.key)
return {
'ok': True,
'fingerprint': newkey.fingerprint,
'comment': newkey.comment
}
except UserModelException:
return {'ok': False}
except InvalidKey:
return {
'ok': False,
'error': 'Not a valid SSH Public Key!'
}
else:
return {'ok': False}
if http_method == 'DELETE':
try:
user.delete_key(sshpubkey)
return {'ok': True}
except UserModelException:
return {'ok': False}
if http_method == 'GET':
return user.sshPublicKey
else:
raise cherrypy.HTTPError(403)
def _check_method(cls, allowed_methods=VALID_METHODS):
"""
Validate the request method is in the set of allowed methods.
If not, set the Allow header to the list of allowed methods and
return a 405 (Method Not Allowed).
"""
if cherrypy.request.method.upper() not in allowed_methods:
cherrypy.response.headers['Allow'] = (', ').join(allowed_methods)
raise cherrypy.HTTPError(405)
def post(self):
message = cherrypy.request.json.get('message', None)
auth_token = cherrypy.request.headers.get('Http-Auth', None)
if auth_token != token:
raise cherrypy.HTTPError(401, "Unauthorized")
elif not message:
raise cherrypy.HTTPError(422, "Message is missing")
else:
stemmer = MessageStemmer(message)
return stemmer.perform()
# ==================================
# Cherrypy configuration and routing
# ==================================
def index(self):
if 'content-length' in cherrypy.request.headers and \
'content-type' in cherrypy.request.headers and \
cherrypy.request.headers['content-type'] == 'application/json':
length = int(cherrypy.request.headers['content-length'])
json_string = cherrypy.request.body.read(length).decode("utf-8")
update = telebot.types.Update.de_json(json_string)
bot.process_new_messages([update.message])
return ''
else:
raise cherrypy.HTTPError(403)
# Handle '/start' and '/help'
def conv_response(resp):
if not isinstance(resp, Response):
return as_bytes(resp)
cookie = cherrypy.response.cookie
for header, value in resp.headers:
if header == 'Set-Cookie':
cookie_obj = SimpleCookie(value)
for name in cookie_obj:
morsel = cookie_obj[name]
cookie[name] = morsel.value
for key in ['expires', 'path', 'comment', 'domain', 'max-age',
'secure', 'version']:
if morsel[key]:
cookie[name][key] = morsel[key]
_stat = int(resp._status.split(' ')[0])
# if self.mako_lookup and self.mako_template:
# argv["message"] = message
# mte = self.mako_lookup.get_template(self.mako_template)
# return [mte.render(**argv)]
if _stat < 300:
cherrypy.response.status = _stat
for key, val in resp.headers:
cherrypy.response.headers[key] = val
return as_bytes(resp.message)
elif 300 <= _stat < 400:
raise cherrypy.HTTPRedirect(resp.message, status=_stat)
else:
raise cherrypy.HTTPError(_stat, message=resp.message)
def registration(self, **kwargs):
logger.debug('Request headers: {}'.format(cherrypy.request.headers))
if cherrypy.request.method == "OPTIONS":
cherrypy_cors.preflight(
allowed_methods=["POST", "GET"], origins='*',
allowed_headers=['Authorization', 'content-type'])
elif cherrypy.request.method == "GET":
_cinfo = self.op.cdb[kwargs['client_id']]
for attr in ['redirect_uris', 'post_logout_redirect_uris']:
try:
_cinfo[attr] = unpack_redirect_uri(_cinfo[attr])
except KeyError:
pass
rr = RegistrationResponse(**_cinfo)
cherrypy.response.headers['Content-Type'] = 'application/json'
return as_bytes(json.dumps(rr.to_dict()))
else:
logger.debug('ClientRegistration kwargs: {}'.format(kwargs))
_request = None
if cherrypy.request.process_request_body is True:
_request = as_unicode(cherrypy.request.body.read())
logger.debug('request_body: {}'.format(_request))
try:
if _request:
resp = self.op.registration_endpoint(_request)
else:
resp = self.op.registration_endpoint(json.dumps(kwargs))
except Exception as err:
logger.error(err)
raise cherrypy.HTTPError(message=str(err))
return conv_response(resp)
def claims(self, **kwargs):
if cherrypy.request.method == "OPTIONS":
cherrypy_cors.preflight(
allowed_methods=["GET"], origins='*',
allowed_headers='Authorization')
else:
try:
authz = cherrypy.request.headers['Authorization']
except KeyError:
authz = None
try:
assert authz.startswith("Bearer")
except AssertionError:
logger.error("Bad authorization token")
cherrypy.HTTPError(400, "Bad authorization token")
tok = authz[7:]
try:
_claims = self.op.claim_access_token[tok]
except KeyError:
logger.error("Bad authorization token")
cherrypy.HTTPError(400, "Bad authorization token")
else:
# one time token
del self.op.claim_access_token[tok]
_info = Message(**_claims)
jwt_key = self.op.keyjar.get_signing_key()
logger.error(_info.to_dict())
cherrypy.response.headers["content-type"] = 'application/jwt'
return as_bytes(_info.to_jwt(key=jwt_key, algorithm="RS256"))
def json_processor(entity):
"""Read application/json data into request.json."""
if not entity.headers.get(ntou('Content-Length'), ntou('')):
raise cherrypy.HTTPError(411)
body = entity.fp.read()
with cherrypy.HTTPError.handle(ValueError, 400, 'Invalid JSON document'):
cherrypy.serving.request.json = json_decode(body.decode('utf-8'))
def _get_file_path(self):
f = os.path.join(self.storage_path, self.SESSION_PREFIX + self.id)
if not os.path.abspath(f).startswith(self.storage_path):
raise cherrypy.HTTPError(400, 'Invalid session id in cookie.')
return f
def basic_auth(realm, users, encrypt=None, debug=False):
"""If auth fails, raise 401 with a basic authentication header.
realm
A string containing the authentication realm.
users
A dict of the form: {username: password} or a callable returning
a dict.
encrypt
callable used to encrypt the password returned from the user-agent.
if None it defaults to a md5 encryption.
"""
if check_auth(users, encrypt):
if debug:
cherrypy.log('Auth successful', 'TOOLS.BASIC_AUTH')
return
# inform the user-agent this path is protected
cherrypy.serving.response.headers[
'www-authenticate'] = httpauth.basicAuth(realm)
raise cherrypy.HTTPError(
401, 'You are not authorized to access that resource')
def validate_since():
"""Validate the current Last-Modified against If-Modified-Since headers.
If no code has set the Last-Modified response header, then no validation
will be performed.
"""
response = cherrypy.serving.response
lastmod = response.headers.get('Last-Modified')
if lastmod:
status, reason, msg = _httputil.valid_status(response.status)
request = cherrypy.serving.request
since = request.headers.get('If-Unmodified-Since')
if since and since != lastmod:
if (status >= 200 and status <= 299) or status == 412:
raise cherrypy.HTTPError(412)
since = request.headers.get('If-Modified-Since')
if since and since == lastmod:
if (status >= 200 and status <= 299) or status == 304:
if request.method in ('GET', 'HEAD'):
raise cherrypy.HTTPRedirect([], 304)
else:
raise cherrypy.HTTPError(412)
# Tool code #
def allow(methods=None, debug=False):
"""Raise 405 if request.method not in methods (default ['GET', 'HEAD']).
The given methods are case-insensitive, and may be in any order.
If only one method is allowed, you may supply a single string;
if more than one, supply a list of strings.
Regardless of whether the current method is allowed or not, this
also emits an 'Allow' response header, containing the given methods.
"""
if not isinstance(methods, (tuple, list)):
methods = [methods]
methods = [m.upper() for m in methods if m]
if not methods:
methods = ['GET', 'HEAD']
elif 'GET' in methods and 'HEAD' not in methods:
methods.append('HEAD')
cherrypy.response.headers['Allow'] = ', '.join(methods)
if cherrypy.request.method not in methods:
if debug:
cherrypy.log('request.method %r not in methods %r' %
(cherrypy.request.method, methods), 'TOOLS.ALLOW')
raise cherrypy.HTTPError(405)
else:
if debug:
cherrypy.log('request.method %r in methods %r' %
(cherrypy.request.method, methods), 'TOOLS.ALLOW')
def referer(pattern, accept=True, accept_missing=False, error=403,
message='Forbidden Referer header.', debug=False):
"""Raise HTTPError if Referer header does/does not match the given pattern.
pattern
A regular expression pattern to test against the Referer.
accept
If True, the Referer must match the pattern; if False,
the Referer must NOT match the pattern.
accept_missing
If True, permit requests with no Referer header.
error
The HTTP error code to return to the client on failure.
message
A string to include in the response body on failure.
"""
try:
ref = cherrypy.serving.request.headers['Referer']
match = bool(re.match(pattern, ref))
if debug:
cherrypy.log('Referer %r matches %r' % (ref, pattern),
'TOOLS.REFERER')
if accept == match:
return
except KeyError:
if debug:
cherrypy.log('No Referer header', 'TOOLS.REFERER')
if accept_missing:
return
raise cherrypy.HTTPError(error, message)