python类text()的实例源码

daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create(self, first_name, last_name):
        """Create a new record in the database"""
        query = text("""
        INSERT INTO tblUser
            (first_name, last_name)
        VALUES
            (:first_name, :last_name);
        """)
        result = self.execute(query, first_name=first_name, last_name=last_name)
        db.session.commit()
        return result.lastrowid
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def update(self, id, first_name, last_name):
        """Update a record in the database with new values"""
        query = text("""
        UPDATE tblUser SET
            first_name=:first_name,
            last_name=:last_name
        WHERE
            id=:id
        """)
        result = self.execute(query,
                              id=id,
                              first_name=first_name,
                              last_name=last_name)
        db.session.commit()
        return result.rowcount > 0
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete(self, id):
        """Delete a record from the database for an ID"""
        query = text("""
        DELETE FROM tblUser WHERE id=:id
        """)
        result = self.execute(query, id=id)
        return result.rowcount > 0
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def list_all(self):
        """Get all records from the database"""
        query = text("""
        SELECT id, first_name, last_name FROM tblUser;
        """)
        return self.fetchall(query)
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_by_id(self, id):
        """Retrieve a record from the database by ID"""
        query = text("""
        SELECT id, user_id, ingredient_id, coefficient FROM tblUserPreference
        WHERE id=:id;
        """)
        return self.fetchone(query, id=id)
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def create(self, user_id, ingredient_id, coefficient):
        """Create a new record in the database"""
        query = text("""
        INSERT INTO tblUserPreference
            (user_id, ingredient_id, coefficient)
        VALUES
            (:user_id, :ingredient_id, :coefficient);
        """)
        result = self.execute(query,
                              user_id=user_id,
                              ingredient_id=ingredient_id,
                              coefficient=coefficient)
        db.session.commit()
        return result.lastrowid
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete(self, id):
        """Delete a record from the database for an ID"""
        query = text("""
        DELETE FROM tblUserPreference WHERE id=:id
        """)
        result = self.execute(query, id=id)
        return result.rowcount > 0
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def list_all_for_user(self, user_id):
        """Get all records from the database"""
        query = text("""
        SELECT id, user_id, ingredient_id, coefficient FROM tblUserPreference
        WHERE user_id=:user_id;
        """)
        return self.fetchall(query, user_id=user_id)
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def list_all(self):
        """Get all ingredients for all breakfasts"""

        query = text("""
        SELECT breakfast_id, ingredient_id, coefficient FROM tblBreakfastIngredient;
        """)

        return self.fetchall(query)
daos.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_all_breakfast_ingredients(self):
        """Get all ingredients for all breakfasts"""

        query = text("""
        SELECT breakfast_id, ingredient_id, coefficient FROM tblBreakfastIngredient;
        """)

        return self.fetchall(query)
test_common.py 文件源码 项目:eggsnspam 作者: wayfair 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_fetchall(self):
        """It gets all rows"""
        query = text("""
        SELECT id, name FROM tblExample
        ORDER BY id
        """)

        # Expect multiple records to be returned
        result = self.dao.fetchall(query)
        self.assertEqual(len(result), 3)
        self.assertDictEqual(result[0], {'id': 1, 'name': 'Foo'})
        self.assertDictEqual(result[1], {'id': 2, 'name': 'Bar'})
        self.assertDictEqual(result[2], {'id': 3, 'name': 'Baz'})
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def has_table(self, connection, table_name, schema=None):

        if schema is None:
            schema=self.default_schema_name

        stmt = select([column('tablename')],
                      from_obj=[text('dbc.tablesvx')]).where(
                        and_(text('DatabaseName=:schema'),
                             text('TableName=:table_name')))

        res = connection.execute(stmt, schema=schema, table_name=table_name).fetchone()
        return res is not None
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_columns(self, connection, table_name, schema=None, **kw):

        helpView=False

        if schema is None:
            schema = self.default_schema_name

        if int(self.server_version_info.split('.')[0])<16:
            dbc_columninfo='dbc.ColumnsV'

            #Check if the object us a view
            stmt = select([column('tablekind')],\
                            from_obj=[text('dbc.tablesV')]).where(\
                            and_(text('DatabaseName=:schema'),\
                                 text('TableName=:table_name'),\
                                 text("tablekind='V'")))
            res = connection.execute(stmt, schema=schema, table_name=table_name).rowcount
            helpView = (res==1)

        else:
            dbc_columninfo='dbc.ColumnsQV'

        stmt = select([column('columnname'), column('columntype'),\
                        column('columnlength'), column('chartype'),\
                        column('decimaltotaldigits'), column('decimalfractionaldigits'),\
                        column('columnformat'),\
                        column('nullable'), column('defaultvalue'), column('idcoltype')],\
                        from_obj=[text(dbc_columninfo)]).where(\
                        and_(text('DatabaseName=:schema'),\
                             text('TableName=:table_name')))

        res = connection.execute(stmt, schema=schema, table_name=table_name).fetchall()

        #If this is a view in pre-16 version, get types for individual columns
        if helpView:
            res=[self._get_column_help(connection, schema,table_name,r['columnname']) for r in res]

        return [self._get_column_info(row) for row in res]
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_table_names(self, connection, schema=None, **kw):

        if schema is None:
            schema = self.default_schema_name

        stmt = select([column('tablename')],
                      from_obj=[text('dbc.TablesVX')]).where(
                      and_(text('DatabaseName = :schema'),
                          or_(text('tablekind=\'T\''),
                              text('tablekind=\'O\''))))
        res = connection.execute(stmt, schema=schema).fetchall()
        return [self.normalize_name(name['tablename']) for name in res]
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_schema_names(self, connection, **kw):
        stmt = select([column('username')],
               from_obj=[text('dbc.UsersV')],
               order_by=[text('username')])
        res = connection.execute(stmt).fetchall()
        return [self.normalize_name(name['username']) for name in res]
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_view_names(self, connection, schema=None, **kw):

        if schema is None:
            schema = self.default_schema_name

        stmt = select([column('tablename')],
                      from_obj=[text('dbc.TablesVX')]).where(
                      and_(text('DatabaseName = :schema'),
                           text('tablekind=\'V\'')))

        res = connection.execute(stmt, schema=schema).fetchall()
        return [self.normalize_name(name['tablename']) for name in res]
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
        """
        Overrides base class method
        """
        if schema is None:
            schema = self.default_schema_name

        stmt = select([column('ColumnName'), column('IndexName')], from_obj=[text('dbc.Indices')]) \
            .where(and_(text('DatabaseName = :schema'),
                        text('TableName=:table'),
                        text('IndexType=:indextype'))) \
            .order_by(asc(column('IndexName')))

        # U for Unique
        res = connection.execute(stmt, schema=schema, table=table_name, indextype='U').fetchall()

        def grouper(fk_row):
            return {
                'name': self.normalize_name(fk_row['IndexName']),
            }

        unique_constraints = list()
        for constraint_info, constraint_cols in groupby(res, grouper):
            unique_constraint = {
                'name': self.normalize_name(constraint_info['name']),
                'column_names': list()
            }

            for constraint_col in constraint_cols:
                unique_constraint['column_names'].append(self.normalize_name(constraint_col['ColumnName']))

            unique_constraints.append(unique_constraint)

        return unique_constraints
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_indexes(self, connection, table_name, schema=None, **kw):
        """
        Overrides base class method
        """

        if schema is None:
            schema = self.default_schema_name

        stmt = select(["*"], from_obj=[text('dbc.Indices')]) \
            .where(and_(text('DatabaseName = :schema'),
                        text('TableName=:table'))) \
            .order_by(asc(column('IndexName')))

        res = connection.execute(stmt, schema=schema, table=table_name).fetchall()

        def grouper(fk_row):
            return {
                'name': fk_row.IndexName or fk_row.IndexNumber, # If IndexName is None TODO: Check what to do
                'unique': True if fk_row.UniqueFlag == 'Y' else False
            }

        # TODO: Check if there's a better way
        indices = list()
        for index_info, index_cols in groupby(res, grouper):
            index_dict = {
                'name': index_info['name'],
                'column_names': list(),
                'unique': index_info['unique']
            }

            for index_col in index_cols:
                index_dict['column_names'].append(self.normalize_name(index_col['ColumnName']))

            indices.append(index_dict)

        return indices
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_transaction_mode(self, connection, **kw):
        """
        Returns the transaction mode set for the current session.
        T = TDBS
        A = ANSI
        """
        stmt = select([text('transaction_mode')],\
                from_obj=[text('dbc.sessioninfov')]).\
                where(text('sessionno=SESSION'))

        res = connection.execute(stmt).scalar()
        return res
dialect.py 文件源码 项目:sqlalchemy-teradata 作者: Teradata 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_server_version_info(self, connection, **kw):
        """
        Returns the Teradata Database software version.
        """
        stmt = select([text('InfoData')],\
                from_obj=[text('dbc.dbcinfov')]).\
                where(text('InfoKey=\'VERSION\''))

        res = connection.execute(stmt).scalar()
        return res


问题


面经


文章

微信
公众号

扫码关注公众号