def get_file():
tmp_folder = "/tmp/mass_download"
subprocess.call(["mkdir", "-p", tmp_folder])
file_hash = clean_hash(request.query.file_hash)
key = ''
if len(file_hash) == 40:
key = 'sha1'
else:
response.status = 400
return jsonize({'message': 'Invalid hash format (use sha1)'})
pc = PackageController()
res = pc.searchFile(file_hash)
if res is None:
response.status = 404
return jsonize({'message': 'File not found in the database'})
if res == 1:
response.status = 400
return jsonize({'message': 'File not available for downloading'})
res = pc.getFile(file_hash)
zip_name = os.path.join(tmp_folder, str(file_hash) + '.zip')
file_name = os.path.join(tmp_folder, str(file_hash) + '.codex')
fd = open(file_name, "wb")
fd.write(res)
fd.close()
subprocess.call(["zip", "-ju", "-P", "codex", zip_name, file_name])
return static_file(str(file_hash) + ".zip", root=tmp_folder, download=True)
python类status()的实例源码
def add_file():
# tags = request.forms.get('name')
upload = request.files.get('file')
form_date = request.forms.get('file_date')
try: # validate
process_date(form_date)
except ValueError:
# response.status = 422 #status can't be added because angular will not
# show the message.
return jsonize({'message': 'Invalid date format'})
logging.debug("add_file(). date=" + str(form_date))
if form_date is None:
form_date = datetime.datetime.now()
name = upload.filename
data_bin = upload.file.read()
file_id = hashlib.sha1(data_bin).hexdigest()
logging.debug("add_file(): file_id=" + str(file_id))
status = upload_file(data_bin)
process_file(file_id) # ToDo: add a redis job
update_date(file_id, form_date)
if(status == "ok"):
return jsonize({'message': 'Added with ' + str(file_id)})
elif(status == "already exists"):
return jsonize({'message': 'Already exists ' + str(file_id)})
elif(status == "virustotal"):
return jsonize({'message': 'Already exists ' + str(file_id)})
else:
return jsonize({'message': 'Error'})
def get_result_from_av():
hash_id = request.query.file_hash
if len(hash_id) == 0:
response.status = 400
return jsonize({'error': 4, 'error_message': 'file_hash parameter is missing.'})
hash_id = clean_hash(hash_id)
if not valid_hash(hash_id):
return jsonize({'error': 5, 'error_message': 'Invalid hash format.'})
if(len(hash_id) != 40):
data = "1=" + str(hash_id)
res = SearchModule.search_by_id(data, 1, [], True)
if(len(res) == 0):
response.status = 400
return jsonize({'error': 6, 'error_message': 'File not found'})
else:
sha1 = res[0]["sha1"]
else:
sha1 = hash_id
key_manager = KeyManager()
if(key_manager.check_keys_in_secrets()):
av_result = get_av_result(sha1, 'high')
else:
return jsonize({'error': 7, "error_message": "Error: VirusTotal API key missing from secrets.py file"})
if(av_result.get('status') == "added"):
return jsonize({"message": "AV scans downloaded."})
elif(av_result.get('status') == "already_had_it"):
return jsonize({"message": "File already have AV scans."})
elif(av_result.get('status') == "not_found"):
return jsonize({"error": 10, "error_message": "Not found on VT."})
elif(av_result.get('status') == "no_key_available"):
return jsonize({"error": 11, "error_message": "No key available right now. Please try again later."})
else:
logging.error("av_result for hash=" + str(sha1))
logging.error("av_result=" + str(av_result))
return jsonize({"error": 9, "error_message": "Cannot get analysis."})
def web_index(token=None):
"""
provides an index of buttons
This function is called from far far away, over the Internet
"""
logging.info('Serving index page')
try:
if 'key' not in settings['server']:
pass
elif decode_token(settings, token) != 'index':
raise ValueError('Invalid label in token')
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to serve the index page")
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
items = []
global buttons
for button in buttons:
items.append({
'label': button,
'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'),
'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'),
'push-url': '/'+settings['tokens'].get(button),
})
logging.debug('Buttons: {}'.format(items))
return template('views/list_items', prefix=settings['server']['url'], items=items)
#
# invoked from bt.tn
#
def web_press(button=None):
"""
Processes the press of a bt.tn device
This function is called from far far away, over the Internet
"""
if button is None:
button = settings['server']['default']
try:
button = decode_token(settings, button)
context = load_button(settings, button)
return handle_button(context)
except socket.error as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to push '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 500
return 'Internal error'
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to push '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
def web_initialise(button=None):
"""
Initialises a room
This function is called from far far away, over the Internet
"""
if button is None:
button = settings['server']['default']
logging.info("Initialising button '{}'".format(button))
try:
button = decode_token(settings, button, action='initialise')
context = load_button(settings, button)
delete_room(context)
global buttons
buttons.pop(button, None)
context = load_button(settings, button)
context['spark']['id'] = get_room(context)
return 'OK'
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to initialise '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
def web_delete(button=None):
"""
Deletes a room
This function is called from far far away, over the Internet
"""
if button is None:
button = settings['server']['default']
logging.info("Deleting button '{}'".format(button))
try:
button = decode_token(settings, button, action='delete')
context = load_button(settings, button)
delete_room(context)
global buttons
buttons.pop(button, None)
return 'OK'
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to delete '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
def raise_error(error_code, msg):
response.status = error_code
return msg
def destroy(resources, resource_id):
"""destroy the resource from app.data"""
if resources not in app.resources or resource_id not in app.data[resources]:
abort(404, "Not Found")
del app.data[resources][resource_id]
response.status = 204
return ""
def dispatch(url):
"""
This class is the beginning of all entrypoints in the Ray API. Here, each url
will be redirect to the right handler
"""
url = bottle_req.path
log.info('request: %s', bottle_req.url)
if url[-1] == '/':
url = url[:-1]
response_code = 200
try:
processed = process(url, bottle_req, bottle_resp)
try:
from_func, http_status = processed[0], processed[1]
bottle_resp.status = http_status
return from_func
except:
return processed
except exceptions.RayException as e:
log.exception('ray exception: ')
response_code = e.http_code
except:
log.exception('exception:')
raise
bottle_resp.status = response_code
def status():
return 'OK'
def create_user_with_key():
"""
Create a user directory with a keyfile on the shared volume, data
arriving in the payload of the request with a JSON payload.
"""
username = username_from_request(request)
if not username:
response.status = 422
return {'error': "Parameter 'username' not specified"}
elif not username_valid(username):
response.status = 400
return {'error':
"Invalid parameter 'username': '{0}' not allowed.".
format(username)
}
pubkey = request.json.get('pubkey')
if not pubkey:
response.status = 422
return {'error': "Parameter 'pubkey' not specified"}
abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username))
username_was_added = check_and_add(username)
# Do the actual creation
store_pubkey(username, abs_home_path, pubkey)
response.status = 201
return {'response':
'Successful creation of user {0} and/or upload of key.'
.format(username)}
def normalize(request, jobdef):
if request.headers['X-Github-Event'] == 'ping':
response.status = 200
response.body = "pong"
return False
env = {}
raw_body = request.body.read()
body = json.load(request.body)
secret = bytes(jobdef.custom_params['secret'])
if request.headers['X-Github-Event'] != 'push':
raise NotImplementedError("Only push and ping events are currently supported")
# Verify hash signature
hashtype, signature = request.headers['X-Hub-Signature'].split('=', 1)
if hashtype != 'sha1':
raise ValueError("Invalid hashtype received")
res = hmac.new(secret, msg=raw_body, digestmod=hashlib.sha1)
# Python <2.7 doesn't have a secure compare for hmac, so we just compare
# manually. This leaves this code vulnerable to timing attacks,
# unfortunately.
if str(res.hexdigest()) != str(signature):
raise ValueError("Invalid secret")
env['provider'] = 'github'
env["event"] = request.headers['X-Github-Event']
env['repo_type'] = 'git'
env['repo_url'] = body['repository']['clone_url']
env['repo_url_ssh'] = body['repository']['ssh_url']
env['repo_url_http'] = body['repository']['clone_url']
env['ref'] = body['ref']
env['commit'] = body['after']
mail_to = []
mail_to.append(body['repository']['owner']['email'])
mail_to.append(body['pusher']['email'])
env['mail_to'] = ', '.join(mail_to)
return env
def create_app(engine):
app = Bottle()
@app.error()
@app.error(404)
def handle_error(error):
if issubclass(type(error.exception), ApiException):
response.status = error.exception.code
else:
response.status = error.status_code
response.set_header('Content-type', 'application/json')
resp = {
'type': type(error.exception).__name__,
'message': repr(error.exception) if error.exception else '',
'traceback': error.traceback,
'status_code': response.status
}
log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
% (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
return '%s %s' % (resp['status_code'], resp['message'])
@app.route('/ping', method=['GET'])
def ping():
return {'name': 'xFlow', 'version': '0.1' }
@app.route('/publish', method=['POST'])
def publish():
data = json.loads(request.body.read())
try:
publish_schema.validate(data)
except jsonschema.ValidationError as err:
raise BadRequest(err)
stream = data['stream']
event = json.dumps(data['event'])
try:
engine.publish(stream, event)
except core.KinesisStreamDoesNotExist as ex:
raise NotFoundException(str(ex))
return {}
@app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
def track(workflow_id, execution_id):
try:
tracking_info = engine.track(workflow_id, execution_id)
return tracking_info
except (core.CloudWatchStreamDoesNotExist,
core.WorkflowDoesNotExist,
core.CloudWatchLogDoesNotExist) as ex:
raise NotFoundException(str(ex))
raise Exception("Something went wrong!")
return app
def web_inbound_call(button=None):
"""
Handles an inbound phone call
This function is called from twilio cloud back-end
"""
if button is None:
button = settings['server']['default']
logging.info("Receiving inbound call for button '{}'".format(button))
try:
button = decode_token(settings, button, action='call')
context = load_button(settings, button)
update, twilio_action = get_push_details(context)
response.content_type = 'text/xml'
behaviour = twilio.twiml.Response()
say = None
if 'call' in twilio_action:
for line in twilio_action['call']:
if line.keys()[0] == 'say':
say = line['say']
break
if say is None:
say = "What's up Doc?"
behaviour.say(say)
return str(behaviour)
except Exception as feedback:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.error("Unable to handle inbound call for '{}'".format(button))
raise
else:
logging.error(str(feedback))
response.status = 400
return 'Invalid request'
#
# the collection of buttons that we manage
#