def create_app(engine):
app = Bottle()
@app.error()
@app.error(404)
def handle_error(error):
if issubclass(type(error.exception), ApiException):
response.status = error.exception.code
else:
response.status = error.status_code
response.set_header('Content-type', 'application/json')
resp = {
'type': type(error.exception).__name__,
'message': repr(error.exception) if error.exception else '',
'traceback': error.traceback,
'status_code': response.status
}
log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
% (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
return '%s %s' % (resp['status_code'], resp['message'])
@app.route('/ping', method=['GET'])
def ping():
return {'name': 'xFlow', 'version': '0.1' }
@app.route('/publish', method=['POST'])
def publish():
data = json.loads(request.body.read())
try:
publish_schema.validate(data)
except jsonschema.ValidationError as err:
raise BadRequest(err)
stream = data['stream']
event = json.dumps(data['event'])
try:
engine.publish(stream, event)
except core.KinesisStreamDoesNotExist as ex:
raise NotFoundException(str(ex))
return {}
@app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
def track(workflow_id, execution_id):
try:
tracking_info = engine.track(workflow_id, execution_id)
return tracking_info
except (core.CloudWatchStreamDoesNotExist,
core.WorkflowDoesNotExist,
core.CloudWatchLogDoesNotExist) as ex:
raise NotFoundException(str(ex))
raise Exception("Something went wrong!")
return app
评论列表
文章目录