def send_ws(self,event):
try:
_data = json.dumps(event.dict_,ensure_ascii=False)
_l = cache['msg']+[_data]
cache['msg'] = _l[-1*cache['len']:]
if event.type_ == EVENT_LOG:
print(event.dict_['log'])
logger.error(event.dict_['log'])
elif event.type_ == EVENT_REBOOT:
reboot_ctp({})
for _ws in cs:
_ws.send(_data)
except Exception,e:
print('ws.send_ws.error,',str(e))
finally:
pass
python类error()的实例源码
def bridge_set(a,b,c):
if c not in ['int','str','float']:return 'error type'
_d = bg.get_instrument()
_l = a.split('.')
b = eval(c)(b)
_tmp = _d
_out = []
for one in _l:
_out.append((one,_tmp))
if one in _tmp:
_tmp = _tmp[one]
else:
return '%s not in correct place'%one
_tmp = b
_n = {}
for k,d in _out[::-1]:
d[k] = _tmp
_tmp = d
bg.set_instrument(_tmp)
return str(_tmp)
def error404(error):
msg = """Non-existent page"""
return template('honeyd/utilities/http/index.html', data=msg)
# css declaration
def raise_error():
abort(404, "error...")
def error404(error):
return '404 error !!!!!'
def reboot_ctp(act):
global cache
global logger
_now = int(time.time()/3600)
print('ws,reboot_ctp,reboot')
if cache.get('reboot',0)!=_now:
logger.error('ws.reboot')
print('reboot')
cache['reboot'] = _now
start_accounts(get_accounts())
logger = otherLogger().get_logger()
def test_401(self):
""" WSGI: abort(401, '') (HTTP 401) """
@bottle.route('/')
def test(): bottle.abort(401)
self.assertStatus(401, '/')
@bottle.error(401)
def err(e):
bottle.response.status = 200
return str(type(e))
self.assertStatus(200, '/')
self.assertBody("<class 'bottle.HTTPError'>",'/')
def test_module_shortcuts(self):
for name in '''route get post put delete error mount
hook install uninstall'''.split():
short = getattr(bottle, name)
original = getattr(bottle.app(), name)
self.assertWraps(short, original)
def exception_plugin(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except TrackParseError as e:
exception = ''.join(
traceback.format_exception(*e.orig_exception_info)
)
return template('error', errormsg=e.msg, exception=exception)
return wrapper
# Provide custom, simple error responses for 400 and 500 errors,
# suitable for direct inclusion in alert models on the client.
def customerror(error):
from bottle import DEBUG
return template("error_simple", DEBUG=DEBUG, e=error)
def error404(error):
return "Error 404: Nothing here, sorry."
def handle_500_error(http_error):
if http_error.exception is not None:
logger.error(http_error.exception)
if http_error.traceback is not None:
logger.error(http_error.traceback)
if http_error.body is not None and len(http_error.body) > 0:
logger.error(http_error.body)
return json.dumps(
{
'status': 1,
'error': "The server encountered an internal error (see server logs for details)",
"details": http_error.body
}
)
def check_auth():
request.account = None
# Allow preflight requests
if request.method == 'OPTIONS':
return
# FIXME: this is legacy authentification, remove this
token_protected_prefix = default_app().config.get("general", "token_protected_paths") or []
if 'X-Auth-Token' in request.headers and \
any(request.path.startswith(prefix) for prefix in token_protected_prefix):
auth_token = request.headers['X-Auth-Token']
expected_token = default_app().config.get("general", "auth_token")
if auth_token != expected_token:
logger.error("Invalid Token:[%s]" % (auth_token,))
abort(401, "Invalid Token.")
return
# Get session token from request headers or cookies
token = None
if 'X-Session-Token' in request.headers:
token = request.headers['X-Session-Token']
if token is None:
# Allow unprotected routes if no token is provided
return use_default_user()
# If a token is provided, check it
with database.session_scope() as session:
user = session.query(m.User).filter(m.User.session_token == token).one_or_none()
if user is None:
logger.info("Unauthorized access attempt with token: {}".format(token))
abort(403)
if user.token_issued_at + datetime.timedelta(minutes=30) < datetime.datetime.utcnow():
logger.info("Token expired: {}".format(token))
abort(403)
request.account = user
# Load roles, then remove them from the session so we can use them anywhere
_expunge_user(request.account, session)
def error404(error):
return "nothing here sorry"
def ccqHubSub():
VARS = request.json
jobScriptLocation = VARS['jobScriptLocation']
jobScriptText = VARS['jobScriptFile']
ccOptionsParsed = VARS['ccOptionsCommandLine']
jobName = VARS['jobName']
jobMD5Hash = VARS["jobMD5Hash"]
userName = VARS["userName"]
password = VARS["password"]
valKey = VARS['valKey']
dateExpires = VARS['dateExpires']
certLength = VARS['certLength']
ccAccessKey = VARS['ccAccessKey']
targetName = VARS['targetName']
remoteUserName = VARS['remoteUserName']
additionalActionsAndPermissionsRequired = VARS['additionalActionsAndPermissionsRequired']
# Since we are not calculating the instance type here we need to set it to None. It will be updated later on when we get the info from the ccq scheduler.
# If the job stays for a local scheduler then this argument is never needed.
ccOptionsParsed['instanceType'] = ccOptionsParsed['requestedInstanceType']
values = validateCreds(userName, password, dateExpires, valKey, certLength, ccAccessKey, remoteUserName, additionalActionsAndPermissionsRequired)
if values['status'] != "success":
#Credentials failed to validate on the server send back error message and failure
return {"status": "failure", "payload": {"message": str(values['payload']), "cert": str(None)}}
else:
identity = values['payload']['identity']
userName = values['payload']['userName']
password = values['payload']['password']
cert = values['payload']['cert']
#TODO implement pricing calls to the instances
# The unique identification of the user who submitted the job is the combination of the identity object and the username that they provide for the job
# They are required to provide a username for the job when submitting through ccqHubsub.
obj = {"jobScriptLocation": str(jobScriptLocation), "jobScriptText": str(jobScriptText), "jobName": str(jobName), "ccOptionsCommandLine": ccOptionsParsed, "jobMD5Hash": jobMD5Hash, "userName": str(userName), "targetName": str(targetName), "identity": str(identity)}
values = ccqHubMethods.saveJobScript(**obj)
if values['status'] != "success":
return {"status": "error", "payload": {"message": values['payload'], "cert": str(None)}}
else:
obj = {"jobScriptLocation": str(jobScriptLocation), "jobScriptText": str(jobScriptText), "jobName": str(jobName), "ccOptionsParsed": ccOptionsParsed, "userName": str(userName), "isRemoteSubmit": "True", "identity": str(identity), "targetName": str(targetName)}
results = ccqHubMethods.saveJob(**obj)
if results['status'] != "success":
return {"status": "error", "payload": {"message": values['payload'], "cert": str(None)}}
else:
generatedJobId = results['payload']['jobId']
return {"status": "success", "payload": {"message": "The job has successfully been submitted to ccqHub. The job id is: " + str(generatedJobId) + ".\n You may use this job id to lookup the job's status using the ccqstat utility.", "cert": str(cert)}}