server.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:xFlow 作者: dsouzajude 项目源码 文件源码
def create_app(engine):
    app = Bottle()

    @app.error()
    @app.error(404)
    def handle_error(error):
        if issubclass(type(error.exception), ApiException):
            response.status = error.exception.code
        else:
            response.status = error.status_code
        response.set_header('Content-type', 'application/json')
        resp = {
            'type': type(error.exception).__name__,
            'message': repr(error.exception) if error.exception else '',
            'traceback': error.traceback,
            'status_code': response.status
        }
        log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
                    % (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
        return '%s %s' % (resp['status_code'], resp['message'])

    @app.route('/ping', method=['GET'])
    def ping():
        return {'name': 'xFlow', 'version': '0.1' }

    @app.route('/publish', method=['POST'])
    def publish():
        data = json.loads(request.body.read())
        try:
            publish_schema.validate(data)
        except jsonschema.ValidationError as err:
            raise BadRequest(err)

        stream = data['stream']
        event = json.dumps(data['event'])
        try:
            engine.publish(stream, event)
        except core.KinesisStreamDoesNotExist as ex:
            raise NotFoundException(str(ex))
        return {}

    @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
    def track(workflow_id, execution_id):
        try:
            tracking_info = engine.track(workflow_id, execution_id)
            return tracking_info
        except (core.CloudWatchStreamDoesNotExist,
                core.WorkflowDoesNotExist,
                core.CloudWatchLogDoesNotExist) as ex:
            raise NotFoundException(str(ex))
        raise Exception("Something went wrong!")

    return app
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号