def generate_cf_dynamo_schema(target):
dynamo_connection = DynamoDBConnection()
class FakeClient(object):
def create_table(self, *args, **kwargs):
write_schema_to_yaml(target, **kwargs)
return {}
client = FakeClient()
dynamo_connection.client = client
class FakeDynamo(object):
def list_tables(self):
return []
def create_table(self, *args):
result = dynamo_connection.create_table(*args)
def describe_table(self, *args):
StatusStruct = namedtuple('Status', 'status')
return StatusStruct(status='ACTIVE')
dynamo = FakeDynamo()
engine = Engine()
engine.dynamo = dynamo
sys.path = ['{}/app/models'.format(target)] + sys.path
modelModules = glob.glob('{}/app/models'.format(target) + '/*.py')
models = [basename(f)[:-3] for f in modelModules if isfile(f)]
for modelName in models:
if modelName != '__init__':
engine.register(getattr(importlib.__import__(modelName), modelName))
engine.create_schema()
评论列表
文章目录