cli.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:tight-cli 作者: Lululemon 项目源码 文件源码
def rundb(target):
    """
    Start running a local DynamoDB instance.

    :param target:
    :return:
    """
    load_env(target)
    os.environ['AWS_REGION'] = 'us-west-2'
    shared_db = './dynamo_db/shared-local-instance.db'
    if os.path.exists(shared_db):
        os.remove(shared_db)
    dynamo_command = ['java', '-Djava.library.path={}/dynamo_db/DynamoDBLocal_lib'.format(CWD), '-jar', '{}/dynamo_db/DynamoDBLocal.jar'.format(CWD), '-sharedDb', '-dbPath', './dynamo_db']
    try:
        dynamo_process = subprocess.Popen(dynamo_command, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    except Exception as e:
        pass
    try:
        '''
        Connect to DynamoDB and register and create tables for application models.
        '''
        engine = Engine()
        engine.connect(os.environ['AWS_REGION'], host='localhost',
                       port=8000,
                       access_key='anything',
                       secret_key='anything',
                       is_secure=False)
        # load models
        sys.path = ['./app/models'] + sys.path
        modelModules = glob.glob('./app/models' + "/*.py")
        models = [basename(f)[:-3] for f in modelModules if isfile(f)]
        for modelName in models:
            if modelName != '__init__':
                engine.register(getattr(__import__(modelName), modelName))
        engine.create_schema()
        tables = [table for table in engine.dynamo.list_tables()]
        print("This engine has the following tables " + str(tables))
        for table in tables:
            engine.dynamo.describe_table(table)
    except Exception as e:
        # IF anything goes wrong, then we self-destruct.
        dynamo_process.kill()
        raise e
    # Wait for process to finish.
    dynamo_process.wait()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号