python类status()的实例源码

api2.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_file():
    tmp_folder = "/tmp/mass_download"
    subprocess.call(["mkdir", "-p", tmp_folder])
    file_hash = clean_hash(request.query.file_hash)
    key = ''
    if len(file_hash) == 40:
        key = 'sha1'
    else:
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use sha1)'})

    pc = PackageController()
    res = pc.searchFile(file_hash)

    if res is None:
        response.status = 404
        return jsonize({'message': 'File not found in the database'})
    if res == 1:
        response.status = 400
        return jsonize({'message': 'File not available for downloading'})
    res = pc.getFile(file_hash)
    zip_name = os.path.join(tmp_folder, str(file_hash) + '.zip')
    file_name = os.path.join(tmp_folder, str(file_hash) + '.codex')
    fd = open(file_name, "wb")
    fd.write(res)
    fd.close()
    subprocess.call(["zip", "-ju", "-P", "codex", zip_name, file_name])
    return static_file(str(file_hash) + ".zip", root=tmp_folder, download=True)
api2.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def add_file():
    # tags = request.forms.get('name')
    upload = request.files.get('file')
    form_date = request.forms.get('file_date')
    try:  # validate
        process_date(form_date)
    except ValueError:
        # response.status = 422 #status can't be added because angular will not
        # show the message.
        return jsonize({'message': 'Invalid date format'})
    logging.debug("add_file(). date=" + str(form_date))
    if form_date is None:
        form_date = datetime.datetime.now()
    name = upload.filename
    data_bin = upload.file.read()
    file_id = hashlib.sha1(data_bin).hexdigest()
    logging.debug("add_file(): file_id=" + str(file_id))
    status = upload_file(data_bin)
    process_file(file_id)  # ToDo: add a redis job
    update_date(file_id, form_date)
    if(status == "ok"):
        return jsonize({'message': 'Added with ' + str(file_id)})
    elif(status == "already exists"):
        return jsonize({'message': 'Already exists ' + str(file_id)})
    elif(status == "virustotal"):
        return jsonize({'message': 'Already exists ' + str(file_id)})
    else:
        return jsonize({'message': 'Error'})
api2.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_result_from_av():
    hash_id = request.query.file_hash
    if len(hash_id) == 0:
        response.status = 400
        return jsonize({'error': 4, 'error_message': 'file_hash parameter is missing.'})
    hash_id = clean_hash(hash_id)
    if not valid_hash(hash_id):
        return jsonize({'error': 5, 'error_message': 'Invalid hash format.'})
    if(len(hash_id) != 40):
        data = "1=" + str(hash_id)
        res = SearchModule.search_by_id(data, 1, [], True)
        if(len(res) == 0):
            response.status = 400
            return jsonize({'error': 6, 'error_message': 'File not found'})
        else:
            sha1 = res[0]["sha1"]
    else:
        sha1 = hash_id
    key_manager = KeyManager()

    if(key_manager.check_keys_in_secrets()):
        av_result = get_av_result(sha1, 'high')
    else:
        return jsonize({'error': 7, "error_message": "Error: VirusTotal API key missing from secrets.py file"})
    if(av_result.get('status') == "added"):
        return jsonize({"message": "AV scans downloaded."})
    elif(av_result.get('status') == "already_had_it"):
        return jsonize({"message": "File already have AV scans."})
    elif(av_result.get('status') == "not_found"):
        return jsonize({"error": 10, "error_message": "Not found on VT."})
    elif(av_result.get('status') == "no_key_available"):
        return jsonize({"error": 11, "error_message": "No key available right now. Please try again later."})
    else:
        logging.error("av_result for hash=" + str(sha1))
        logging.error("av_result=" + str(av_result))
        return jsonize({"error": 9, "error_message": "Cannot get analysis."})
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def web_index(token=None):
    """
    provides an index of buttons

    This function is called from far far away, over the Internet
    """

    logging.info('Serving index page')

    try:
        if 'key' not in settings['server']:
            pass

        elif decode_token(settings, token) != 'index':
            raise ValueError('Invalid label in token')

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to serve the index page")
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'

    items = []
    global buttons
    for button in buttons:
        items.append({
            'label': button,
            'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'),
            'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'),
            'push-url': '/'+settings['tokens'].get(button),
            })
    logging.debug('Buttons: {}'.format(items))

    return template('views/list_items', prefix=settings['server']['url'], items=items)

#
# invoked from bt.tn
#
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def web_press(button=None):
    """
    Processes the press of a bt.tn device

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    try:

        button = decode_token(settings, button)

        context = load_button(settings, button)

        return handle_button(context)

    except socket.error as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 500
            return 'Internal error'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def web_initialise(button=None):
    """
    Initialises a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Initialising button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='initialise')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        context = load_button(settings, button)
        context['spark']['id'] = get_room(context)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to initialise '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def web_delete(button=None):
    """
    Deletes a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Deleting button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='delete')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to delete '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
app.py 文件源码 项目:im-a-teapot 作者: vpetersson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def raise_error(error_code, msg):
    response.status = error_code
    return msg
server.py 文件源码 项目:XDocs 作者: gaojiuli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def destroy(resources, resource_id):
    """destroy the resource from app.data"""
    if resources not in app.resources or resource_id not in app.data[resources]:
        abort(404, "Not Found")
    del app.data[resources][resource_id]
    response.status = 204
    return ""
api.py 文件源码 项目:ray 作者: felipevolpone 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def dispatch(url):
    """
        This class is the beginning of all entrypoints in the Ray API. Here, each url
        will be redirect to the right handler
    """

    url = bottle_req.path
    log.info('request: %s', bottle_req.url)

    if url[-1] == '/':
        url = url[:-1]

    response_code = 200

    try:
        processed = process(url, bottle_req, bottle_resp)

        try:
            from_func, http_status = processed[0], processed[1]
            bottle_resp.status = http_status
            return from_func
        except:
            return processed

    except exceptions.RayException as e:
        log.exception('ray exception: ')
        response_code = e.http_code

    except:
        log.exception('exception:')
        raise

    bottle_resp.status = response_code
index.py 文件源码 项目:c-bastion 作者: ImmobilienScout24 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def status():
    return 'OK'
index.py 文件源码 项目:c-bastion 作者: ImmobilienScout24 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_user_with_key():
    """
    Create a user directory with a keyfile on the shared volume, data
    arriving in the payload of the request with a JSON payload.
    """
    username = username_from_request(request)
    if not username:
        response.status = 422
        return {'error': "Parameter 'username' not specified"}
    elif not username_valid(username):
        response.status = 400
        return {'error':
                "Invalid parameter 'username': '{0}' not allowed.".
                format(username)
                }

    pubkey = request.json.get('pubkey')
    if not pubkey:
        response.status = 422
        return {'error': "Parameter 'pubkey' not specified"}

    abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username))

    username_was_added = check_and_add(username)

    # Do the actual creation
    store_pubkey(username, abs_home_path, pubkey)

    response.status = 201
    return {'response':
            'Successful creation of user {0} and/or upload of key.'
            .format(username)}
github.py 文件源码 项目:jerrybuild 作者: fboender 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def normalize(request, jobdef):
    if request.headers['X-Github-Event'] == 'ping':
        response.status = 200
        response.body = "pong"
        return False

    env = {}
    raw_body = request.body.read()
    body = json.load(request.body)
    secret = bytes(jobdef.custom_params['secret'])

    if request.headers['X-Github-Event'] != 'push':
        raise NotImplementedError("Only push and ping events are currently supported")

    # Verify hash signature
    hashtype, signature = request.headers['X-Hub-Signature'].split('=', 1)
    if hashtype != 'sha1':
        raise ValueError("Invalid hashtype received")

    res = hmac.new(secret, msg=raw_body, digestmod=hashlib.sha1)
    # Python <2.7 doesn't have a secure compare for hmac, so we just compare
    # manually. This leaves this code vulnerable to timing attacks,
    # unfortunately.
    if str(res.hexdigest()) != str(signature):
        raise ValueError("Invalid secret")

    env['provider'] = 'github'
    env["event"] = request.headers['X-Github-Event']
    env['repo_type'] = 'git'
    env['repo_url'] = body['repository']['clone_url']
    env['repo_url_ssh'] = body['repository']['ssh_url']
    env['repo_url_http'] = body['repository']['clone_url']
    env['ref'] = body['ref']
    env['commit'] = body['after']

    mail_to = []
    mail_to.append(body['repository']['owner']['email'])
    mail_to.append(body['pusher']['email'])
    env['mail_to'] = ', '.join(mail_to)

    return env
server.py 文件源码 项目:xFlow 作者: dsouzajude 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_app(engine):
    app = Bottle()

    @app.error()
    @app.error(404)
    def handle_error(error):
        if issubclass(type(error.exception), ApiException):
            response.status = error.exception.code
        else:
            response.status = error.status_code
        response.set_header('Content-type', 'application/json')
        resp = {
            'type': type(error.exception).__name__,
            'message': repr(error.exception) if error.exception else '',
            'traceback': error.traceback,
            'status_code': response.status
        }
        log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
                    % (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
        return '%s %s' % (resp['status_code'], resp['message'])

    @app.route('/ping', method=['GET'])
    def ping():
        return {'name': 'xFlow', 'version': '0.1' }

    @app.route('/publish', method=['POST'])
    def publish():
        data = json.loads(request.body.read())
        try:
            publish_schema.validate(data)
        except jsonschema.ValidationError as err:
            raise BadRequest(err)

        stream = data['stream']
        event = json.dumps(data['event'])
        try:
            engine.publish(stream, event)
        except core.KinesisStreamDoesNotExist as ex:
            raise NotFoundException(str(ex))
        return {}

    @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
    def track(workflow_id, execution_id):
        try:
            tracking_info = engine.track(workflow_id, execution_id)
            return tracking_info
        except (core.CloudWatchStreamDoesNotExist,
                core.WorkflowDoesNotExist,
                core.CloudWatchLogDoesNotExist) as ex:
            raise NotFoundException(str(ex))
        raise Exception("Something went wrong!")

    return app
hook.py 文件源码 项目:green-button 作者: bernard357 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def web_inbound_call(button=None):
    """
    Handles an inbound phone call

    This function is called from twilio cloud back-end
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Receiving inbound call for button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='call')

        context = load_button(settings, button)
        update, twilio_action = get_push_details(context)

        response.content_type = 'text/xml'
        behaviour = twilio.twiml.Response()

        say = None
        if 'call' in twilio_action:
            for line in twilio_action['call']:
                if line.keys()[0] == 'say':
                    say = line['say']
                    break

        if say is None:
            say = "What's up Doc?"

        behaviour.say(say)
        return str(behaviour)

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to handle inbound call for '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'

#
# the collection of buttons that we manage
#


问题


面经


文章

微信
公众号

扫码关注公众号