python类CreateTable()的实例源码

generate_data.py 文件源码 项目:aiohttp_admin 作者: aio-libs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def prepare_tables(pg):
    tables = [db.post, db.tag, db.comment]
    await delete_tables(pg, tables)
    async with pg.acquire() as conn:
        for table in tables:
            create_expr = CreateTable(table)
            await conn.execute(create_expr)
test_core.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_traced_none(self):
        tracer = DummyTracer()
        sqlalchemy_opentracing.init_tracing(tracer, False, False)
        sqlalchemy_opentracing.register_engine(self.engine)

        creat = CreateTable(self.users_table)
        self.engine.execute(creat)

        self.assertEqual(0, len(tracer.spans))
test_core.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_traced_all_queries(self):
        tracer = DummyTracer()
        sqlalchemy_opentracing.init_tracing(tracer, False, trace_all_queries=True)
        sqlalchemy_opentracing.register_engine(self.engine)

        creat = CreateTable(self.users_table)
        self.engine.execute(creat)

        self.assertEqual(1, len(tracer.spans))
test_core.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_traced_error(self):
        tracer = DummyTracer()
        sqlalchemy_opentracing.init_tracing(tracer, False, False)
        sqlalchemy_opentracing.register_engine(self.engine)

        creat = CreateTable(self.users_table)
        self.engine.execute(creat)
        self.assertEqual(0, len(tracer.spans))

        sqlalchemy_opentracing.set_traced(creat)
        try:
            self.engine.execute(creat)
        except OperationalError:
            pass # Do nothing - it's responsibility of OT to finish tracing it.

        self.assertEqual(1, len(tracer.spans))
        self.assertEqual(tracer.spans[0].is_finished, True)
        self.assertEqual(tracer.spans[0].tags, {
            'component': 'sqlalchemy',
            'db.statement': 'CREATE TABLE users (id INTEGER NOT NULL, name VARCHAR, PRIMARY KEY (id))',
            'db.type': 'sql',
            'sqlalchemy.dialect': 'sqlite',
            'sqlalchemy.exception': 'table users already exists',
            'error': 'true',
        })
        self.assertEqual(False, sqlalchemy_opentracing.get_traced(creat))
test_core.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_traced_rollback(self):
        tracer = DummyTracer()
        sqlalchemy_opentracing.init_tracing(tracer, False, False)
        sqlalchemy_opentracing.register_engine(self.engine)

        creat = CreateTable(self.users_table)
        ins = self.users_table.insert().values(name='John Doe')

        # Don't trace this.
        self.engine.execute(creat)

        parent_span = DummySpan('parent span')
        conn = self.engine.connect()
        try:
            with conn.begin() as tx:
                sqlalchemy_opentracing.set_parent_span(conn, parent_span)
                conn.execute(ins)
                conn.execute(creat)
        except OperationalError:
            pass

        self.assertEqual(2, len(tracer.spans))
        self.assertEqual(True, all(map(lambda x: x.is_finished, tracer.spans)))
        self.assertEqual(True, all(map(lambda x: x.child_of == parent_span, tracer.spans)))
        self.assertEqual(['insert', 'create_table'],
                         map(lambda x: x.operation_name, tracer.spans))
        self.assertEqual(['false', 'true'],
                         map(lambda x: x.tags.get('error', 'false'), tracer.spans))
test_core.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_traced_after_rollback(self):
        tracer = DummyTracer()
        sqlalchemy_opentracing.init_tracing(tracer, False, False)
        sqlalchemy_opentracing.register_engine(self.engine)

        creat = CreateTable(self.users_table)

        # Create a table, but don't trace it
        conn = self.engine.connect()
        with conn.begin() as tx:
            conn.execute(creat)

        try:
            with conn.begin() as tx:
                sqlalchemy_opentracing.set_traced(conn)
                conn.execute(creat)
        except OperationalError:
            pass

        self.assertEqual(1, len(tracer.spans))

        # Do something right after with this connection,
        # no tracing should happen.
        tracer.clear()
        ins = self.users_table.insert().values(name='John Doe')
        with conn.begin() as tx:
            conn.execute(ins)

        self.assertEqual(0, len(tracer.spans))
test_api.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_traced_property(self):
        stmt_obj = CreateTable(self.users_table)
        sqlalchemy_opentracing.set_traced(stmt_obj)
        self.assertEqual(True, sqlalchemy_opentracing.get_traced(stmt_obj))
test_api.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_has_parent(self):
        span = DummySpan()
        stmt = CreateTable(self.users_table)
        sqlalchemy_opentracing.set_parent_span(stmt, span)
        self.assertEqual(True, sqlalchemy_opentracing.has_parent_span(stmt))
        self.assertEqual(span, sqlalchemy_opentracing.get_parent_span(stmt))
test_api.py 文件源码 项目:python-sqlalchemy 作者: carlosalberto 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_has_parent_none(self):
        stmt = CreateTable(self.users_table)
        sqlalchemy_opentracing.set_traced(stmt)
        self.assertEqual(False, sqlalchemy_opentracing.has_parent_span(stmt))
        self.assertEqual(None, sqlalchemy_opentracing.get_parent_span(stmt))
sql.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sql_schema(self):
        from sqlalchemy.schema import CreateTable
        return str(CreateTable(self.table).compile(self.pd_sql.connectable))
impl.py 文件源码 项目:webapp 作者: superchilli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_table(self, table):
        if util.sqla_07:
            table.dispatch.before_create(table, self.connection,
                                         checkfirst=False,
                                         _ddl_runner=self)
        self._exec(schema.CreateTable(table))
        if util.sqla_07:
            table.dispatch.after_create(table, self.connection,
                                        checkfirst=False,
                                        _ddl_runner=self)
        for index in table.indexes:
            self._exec(schema.CreateIndex(index))
impl.py 文件源码 项目:QualquerMerdaAPI 作者: tiagovizoto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_table(self, table):
        if util.sqla_07:
            table.dispatch.before_create(table, self.connection,
                                         checkfirst=False,
                                         _ddl_runner=self)
        self._exec(schema.CreateTable(table))
        if util.sqla_07:
            table.dispatch.after_create(table, self.connection,
                                        checkfirst=False,
                                        _ddl_runner=self)
        for index in table.indexes:
            self._exec(schema.CreateIndex(index))
impl.py 文件源码 项目:gardenbot 作者: GoestaO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_table(self, table):
        if util.sqla_07:
            table.dispatch.before_create(table, self.connection,
                                         checkfirst=False,
                                         _ddl_runner=self)
        self._exec(schema.CreateTable(table))
        if util.sqla_07:
            table.dispatch.after_create(table, self.connection,
                                        checkfirst=False,
                                        _ddl_runner=self)
        for index in table.indexes:
            self._exec(schema.CreateIndex(index))
impl.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_table(self, table):
        if util.sqla_07:
            table.dispatch.before_create(table, self.connection,
                                         checkfirst=False,
                                         _ddl_runner=self)
        self._exec(schema.CreateTable(table))
        if util.sqla_07:
            table.dispatch.after_create(table, self.connection,
                                        checkfirst=False,
                                        _ddl_runner=self)
        for index in table.indexes:
            self._exec(schema.CreateIndex(index))
carto.py 文件源码 项目:the-el 作者: CityOfPhiladelphia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_table(table_name, load_postgis, json_table_schema, if_not_exists, indexes_fields, connection_string):
    if load_postgis:
        load_postgis_support()

    creds = re.match(carto_connection_string_regex, connection_string).groups()
    statement = CreateTable(get_table(table_name, json_table_schema))
    str_statement = statement.compile(dialect=postgresql.dialect())

    if if_not_exists:
        str_statement = str(str_statement).replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')

    carto_sql_call(creds, str_statement)

    if indexes_fields:
        create_indexes(creds, table_name, indexes_fields)
impl.py 文件源码 项目:ngx_status 作者: YoYoAdorkable 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_table(self, table):
        if util.sqla_07:
            table.dispatch.before_create(table, self.connection,
                                         checkfirst=False,
                                         _ddl_runner=self)
        self._exec(schema.CreateTable(table))
        if util.sqla_07:
            table.dispatch.after_create(table, self.connection,
                                        checkfirst=False,
                                        _ddl_runner=self)
        for index in table.indexes:
            self._exec(schema.CreateIndex(index))
database.py 文件源码 项目:infoset-ng 作者: PalisadoesFoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        """Setup database.

        Args:
            None

        Returns:
            None

        """
        # Initialize key variables
        use_mysql = True
        pool_size = 25
        max_overflow = 25
        config = self.config

        mappings = [Agent, Department, Device, Billcode, DeviceAgent, Datapoint, AgentName] 

        # Create DB connection pool
        if use_mysql is True:
            # Add MySQL to the pool
            engine = create_engine(
                URL, echo=False,
                encoding='utf8',
                max_overflow=max_overflow,
                pool_size=pool_size, pool_recycle=3600)

            # Try to create the database
            shared.print_ok('Attempting to create database tables')
            try:
                sql_string = (
                    'ALTER DATABASE %s CHARACTER SET utf8mb4 '
                    'COLLATE utf8mb4_general_ci') % (config.db_name())
                engine.execute(sql_string)
            except:
                log_message = (
                    'Cannot connect to database %s. '
                    'Verify database server is started. '
                    'Verify database is created. '
                    'Verify that the configured database authentication '
                    'is correct.') % (config.db_name())
                log.log2die(1046, log_message)

            # Apply schemas
            shared.print_ok('Generating Schemas.')

            with open('infoset.sql', 'w') as infoset_mysql:
                for mapping in mappings:
                    print(CreateTable(mapping.__table__))
                    infoset_mysql.write(str(CreateTable(mapping.__table__)))
            infoset_mysql.close()

            # Insert database entries
            self._insert_agent_device()
            self._insert_billcode()
            self._insert_department()
            self._insert_datapoint()
            self._insert_config()


问题


面经


文章

微信
公众号

扫码关注公众号