def setUp(self):
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
test_user = User()
test_user.id = 123456
test_user.username = 'Thinkwhere'
self.task_stub = Task()
self.task_stub.id = 1
self.task_stub.project_id = 1
self.task_stub.task_status = 0
self.task_stub.locked_by = 123456
self.task_stub.lock_holder = test_user
self.lock_task_dto = LockTaskDTO()
self.lock_task_dto.user_id = 123456
self.mapped_task_dto = MappedTaskDTO()
self.mapped_task_dto.status = TaskStatus.MAPPED.name
self.mapped_task_dto.user_id = 123456
python类create_app()的实例源码
def start_app():
"""
Run in development mode, never used in production.
"""
port = int(os.getenv("PORT", 5000))
try:
app = create_app()
app.run(host='0.0.0.0', port=port)
except APIException as e:
print ("Application failed to start")
def setUp(self):
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
self.unlock_task_stub = Task()
self.unlock_task_stub.task_status = TaskStatus.MAPPED.value
self.unlock_task_stub.lock_holder_id = 123456
def setUp(self):
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
"""
Setup test context so we can connect to database
"""
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
if self.skip_tests:
return
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
self.test_user = create_canned_user()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
self.test_project, self.test_user = create_canned_project()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
self.test_project, self.test_user = create_canned_project()
def setUp(self):
if self.skip_tests:
return
self.app = create_app()
self.ctx = self.app.app_context()
self.ctx.push()
def create_app(self):
"""
create app
Returns: flask app object
"""
self.app = create_app("sqlite:///", True)
self.app.config["TESTING"] = True
self.app.config['PRESERVE_CONTEXT_ON_EXCEPTION'] = False
return self.app
def main():
args = _get_args()
level = args['log_level'].upper()
level = log_levels.get(level, logging.INFO)
log = setup_logging(log_level=level)
import core, utils, server
config_file = args['CONFIG']
if not utils.file_exists(config_file):
log.error('File %s does not exist' % (config_file))
sys.exit(1)
log.info('Validating config')
try:
core.Engine.validate_config(config_file)
except core.ConfigValidationError as ex:
log.error('Invalid config. %s' % (str(ex)))
sys.exit(1)
log.info('Initializing xFlow engine')
engine = core.Engine(config_file)
log.info('Config is valid')
# Run as server
if args['s']:
logging.info('Configuring xFlow Engine')
engine.configure()
app = server.create_app(engine)
logging.info('Running as server')
app.run(host='0.0.0.0', port=80, server='waitress', loglevel='warning')
# Configure the lambdas, streams and subscriptions
if args['c']:
logging.info('Configuring xFlow Engine')
engine.configure()
logging.info('xFlow Engine configured')
# Publish json data to stream
if args['p']:
stream = args['p'][0]
data = args['p'][1]
log.info('\n\n\nPublishing to stream: %s\n\nData: %s' % (stream, data))
try:
engine.publish(stream, data)
log.info('Published')
except core.KinesisStreamDoesNotExist:
sys.exit(1)
# Track a workflow
if args['t']:
workflow_id = args['t'][0]
execution_id = args['t'][1]
log.info("\n\n\nTracking workflow, workflow_id=%s, execution_id=%s" % (workflow_id, execution_id))
try:
tracking_info = engine.track(workflow_id, execution_id)
print json.dumps(tracking_info, indent=4)
except (core.CloudWatchStreamDoesNotExist,
core.WorkflowDoesNotExist,
core.CloudWatchLogDoesNotExist):
sys.exit(1)