def create(self, first_name, last_name):
"""Create a new record in the database"""
query = text("""
INSERT INTO tblUser
(first_name, last_name)
VALUES
(:first_name, :last_name);
""")
result = self.execute(query, first_name=first_name, last_name=last_name)
db.session.commit()
return result.lastrowid
python类text()的实例源码
def update(self, id, first_name, last_name):
"""Update a record in the database with new values"""
query = text("""
UPDATE tblUser SET
first_name=:first_name,
last_name=:last_name
WHERE
id=:id
""")
result = self.execute(query,
id=id,
first_name=first_name,
last_name=last_name)
db.session.commit()
return result.rowcount > 0
def delete(self, id):
"""Delete a record from the database for an ID"""
query = text("""
DELETE FROM tblUser WHERE id=:id
""")
result = self.execute(query, id=id)
return result.rowcount > 0
def list_all(self):
"""Get all records from the database"""
query = text("""
SELECT id, first_name, last_name FROM tblUser;
""")
return self.fetchall(query)
def get_by_id(self, id):
"""Retrieve a record from the database by ID"""
query = text("""
SELECT id, user_id, ingredient_id, coefficient FROM tblUserPreference
WHERE id=:id;
""")
return self.fetchone(query, id=id)
def create(self, user_id, ingredient_id, coefficient):
"""Create a new record in the database"""
query = text("""
INSERT INTO tblUserPreference
(user_id, ingredient_id, coefficient)
VALUES
(:user_id, :ingredient_id, :coefficient);
""")
result = self.execute(query,
user_id=user_id,
ingredient_id=ingredient_id,
coefficient=coefficient)
db.session.commit()
return result.lastrowid
def delete(self, id):
"""Delete a record from the database for an ID"""
query = text("""
DELETE FROM tblUserPreference WHERE id=:id
""")
result = self.execute(query, id=id)
return result.rowcount > 0
def list_all_for_user(self, user_id):
"""Get all records from the database"""
query = text("""
SELECT id, user_id, ingredient_id, coefficient FROM tblUserPreference
WHERE user_id=:user_id;
""")
return self.fetchall(query, user_id=user_id)
def list_all(self):
"""Get all ingredients for all breakfasts"""
query = text("""
SELECT breakfast_id, ingredient_id, coefficient FROM tblBreakfastIngredient;
""")
return self.fetchall(query)
def get_all_breakfast_ingredients(self):
"""Get all ingredients for all breakfasts"""
query = text("""
SELECT breakfast_id, ingredient_id, coefficient FROM tblBreakfastIngredient;
""")
return self.fetchall(query)
def test_fetchall(self):
"""It gets all rows"""
query = text("""
SELECT id, name FROM tblExample
ORDER BY id
""")
# Expect multiple records to be returned
result = self.dao.fetchall(query)
self.assertEqual(len(result), 3)
self.assertDictEqual(result[0], {'id': 1, 'name': 'Foo'})
self.assertDictEqual(result[1], {'id': 2, 'name': 'Bar'})
self.assertDictEqual(result[2], {'id': 3, 'name': 'Baz'})
def has_table(self, connection, table_name, schema=None):
if schema is None:
schema=self.default_schema_name
stmt = select([column('tablename')],
from_obj=[text('dbc.tablesvx')]).where(
and_(text('DatabaseName=:schema'),
text('TableName=:table_name')))
res = connection.execute(stmt, schema=schema, table_name=table_name).fetchone()
return res is not None
def get_columns(self, connection, table_name, schema=None, **kw):
helpView=False
if schema is None:
schema = self.default_schema_name
if int(self.server_version_info.split('.')[0])<16:
dbc_columninfo='dbc.ColumnsV'
#Check if the object us a view
stmt = select([column('tablekind')],\
from_obj=[text('dbc.tablesV')]).where(\
and_(text('DatabaseName=:schema'),\
text('TableName=:table_name'),\
text("tablekind='V'")))
res = connection.execute(stmt, schema=schema, table_name=table_name).rowcount
helpView = (res==1)
else:
dbc_columninfo='dbc.ColumnsQV'
stmt = select([column('columnname'), column('columntype'),\
column('columnlength'), column('chartype'),\
column('decimaltotaldigits'), column('decimalfractionaldigits'),\
column('columnformat'),\
column('nullable'), column('defaultvalue'), column('idcoltype')],\
from_obj=[text(dbc_columninfo)]).where(\
and_(text('DatabaseName=:schema'),\
text('TableName=:table_name')))
res = connection.execute(stmt, schema=schema, table_name=table_name).fetchall()
#If this is a view in pre-16 version, get types for individual columns
if helpView:
res=[self._get_column_help(connection, schema,table_name,r['columnname']) for r in res]
return [self._get_column_info(row) for row in res]
def get_table_names(self, connection, schema=None, **kw):
if schema is None:
schema = self.default_schema_name
stmt = select([column('tablename')],
from_obj=[text('dbc.TablesVX')]).where(
and_(text('DatabaseName = :schema'),
or_(text('tablekind=\'T\''),
text('tablekind=\'O\''))))
res = connection.execute(stmt, schema=schema).fetchall()
return [self.normalize_name(name['tablename']) for name in res]
def get_schema_names(self, connection, **kw):
stmt = select([column('username')],
from_obj=[text('dbc.UsersV')],
order_by=[text('username')])
res = connection.execute(stmt).fetchall()
return [self.normalize_name(name['username']) for name in res]
def get_view_names(self, connection, schema=None, **kw):
if schema is None:
schema = self.default_schema_name
stmt = select([column('tablename')],
from_obj=[text('dbc.TablesVX')]).where(
and_(text('DatabaseName = :schema'),
text('tablekind=\'V\'')))
res = connection.execute(stmt, schema=schema).fetchall()
return [self.normalize_name(name['tablename']) for name in res]
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
"""
Overrides base class method
"""
if schema is None:
schema = self.default_schema_name
stmt = select([column('ColumnName'), column('IndexName')], from_obj=[text('dbc.Indices')]) \
.where(and_(text('DatabaseName = :schema'),
text('TableName=:table'),
text('IndexType=:indextype'))) \
.order_by(asc(column('IndexName')))
# U for Unique
res = connection.execute(stmt, schema=schema, table=table_name, indextype='U').fetchall()
def grouper(fk_row):
return {
'name': self.normalize_name(fk_row['IndexName']),
}
unique_constraints = list()
for constraint_info, constraint_cols in groupby(res, grouper):
unique_constraint = {
'name': self.normalize_name(constraint_info['name']),
'column_names': list()
}
for constraint_col in constraint_cols:
unique_constraint['column_names'].append(self.normalize_name(constraint_col['ColumnName']))
unique_constraints.append(unique_constraint)
return unique_constraints
def get_indexes(self, connection, table_name, schema=None, **kw):
"""
Overrides base class method
"""
if schema is None:
schema = self.default_schema_name
stmt = select(["*"], from_obj=[text('dbc.Indices')]) \
.where(and_(text('DatabaseName = :schema'),
text('TableName=:table'))) \
.order_by(asc(column('IndexName')))
res = connection.execute(stmt, schema=schema, table=table_name).fetchall()
def grouper(fk_row):
return {
'name': fk_row.IndexName or fk_row.IndexNumber, # If IndexName is None TODO: Check what to do
'unique': True if fk_row.UniqueFlag == 'Y' else False
}
# TODO: Check if there's a better way
indices = list()
for index_info, index_cols in groupby(res, grouper):
index_dict = {
'name': index_info['name'],
'column_names': list(),
'unique': index_info['unique']
}
for index_col in index_cols:
index_dict['column_names'].append(self.normalize_name(index_col['ColumnName']))
indices.append(index_dict)
return indices
def get_transaction_mode(self, connection, **kw):
"""
Returns the transaction mode set for the current session.
T = TDBS
A = ANSI
"""
stmt = select([text('transaction_mode')],\
from_obj=[text('dbc.sessioninfov')]).\
where(text('sessionno=SESSION'))
res = connection.execute(stmt).scalar()
return res
def _get_server_version_info(self, connection, **kw):
"""
Returns the Teradata Database software version.
"""
stmt = select([text('InfoData')],\
from_obj=[text('dbc.dbcinfov')]).\
where(text('InfoKey=\'VERSION\''))
res = connection.execute(stmt).scalar()
return res