def get_column_specification(self, column, override_pk=False, **kwargs):
colspec = column.name
if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
colspec += " SERIAL"
else:
colspec += " " + column.type.get_col_spec()
default = self.get_column_default_string(column)
if default is not None:
colspec += " DEFAULT " + default
if not column.nullable:
colspec += " NOT NULL"
if column.primary_key and not override_pk:
colspec += " PRIMARY KEY"
if column.foreign_key:
colspec += " REFERENCES %s(%s)" % (column.column.foreign_key.column.table.fullname, column.column.foreign_key.column.name)
return colspec
python类Integer()的实例源码
def get_column_specification(self, column, override_pk=False, first_pk=False):
colspec = column.name + " " + column.type.get_col_spec()
default = self.get_column_default_string(column)
if default is not None:
colspec += " DEFAULT " + default
if not column.nullable:
colspec += " NOT NULL"
if column.primary_key:
if not override_pk:
colspec += " PRIMARY KEY"
if first_pk and isinstance(column.type, types.Integer):
colspec += " AUTO_INCREMENT"
if column.foreign_key:
colspec += ", FOREIGN KEY (%s) REFERENCES %s(%s)" % (column.name, column.column.foreign_key.column.table.name, column.column.foreign_key.column.name)
return colspec
def url(self):
return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)
# class ItemCandidateTag(Base):
# __tablename__ = 'item_candidate_tag'
# __table_args__ = (
# ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
# [ItemCandidate.item_id,
# ItemCandidate.osm_id,
# ItemCandidate.osm_type]),
# )
#
# item_id = Column(Integer, primary_key=True)
# osm_id = Column(BigInteger, primary_key=True)
# osm_type = Column(osm_type_enum, primary_key=True)
# k = Column(String, primary_key=True)
# v = Column(String, primary_key=True)
#
# item_candidate = relationship(ItemCandidate,
# backref=backref('tag_table', lazy='dynamic'))
def get_column_specification(self, column, **kw):
colspec = self.preparer.format_column(column)
first = None
if column.primary_key and column.autoincrement:
try:
first = [c for c in column.table.primary_key.columns
if (c.autoincrement and
isinstance(c.type, sqltypes.Integer) and
not c.foreign_keys)].pop(0)
except IndexError:
pass
if column is first:
colspec += " SERIAL"
else:
colspec += " " + self.dialect.type_compiler.process(column.type)
default = self.get_column_default_string(column)
if default is not None:
colspec += " DEFAULT " + default
if not column.nullable:
colspec += " NOT NULL"
return colspec
def get_column_specification(self, column, **kwargs):
coltype = self.dialect.type_compiler.process(column.type)
colspec = self.preparer.format_column(column) + " " + coltype
default = self.get_column_default_string(column)
if default is not None:
colspec += " DEFAULT " + default
if not column.nullable:
colspec += " NOT NULL"
if (column.primary_key and
column.table.kwargs.get('sqlite_autoincrement', False) and
len(column.table.primary_key.columns) == 1 and
issubclass(column.type._type_affinity, sqltypes.Integer) and
not column.foreign_keys):
colspec += " PRIMARY KEY AUTOINCREMENT"
return colspec
def _expression_adaptations(self):
return {
operators.add: {
Integer: self.__class__,
Interval: DateTime,
Time: DateTime,
},
operators.sub: {
# date - integer = date
Integer: self.__class__,
# date - date = integer.
Date: Integer,
Interval: DateTime,
# date - datetime = interval,
# this one is not in the PG docs
# but works
DateTime: Interval,
},
}
def visit_column(self, delta):
table = delta.table
colspec = self.get_column_specification(delta.result_column)
if delta.result_column.autoincrement:
primary_keys = [c for c in table.primary_key.columns
if (c.autoincrement and
isinstance(c.type, sqltypes.Integer) and
not c.foreign_keys)]
if primary_keys:
first = primary_keys.pop(0)
if first.name == delta.current_name:
colspec += " AUTO_INCREMENT"
old_col_name = self.preparer.quote(delta.current_name, table.quote)
self.start_alter_table(table)
self.append("CHANGE COLUMN %s " % old_col_name)
self.append(colspec)
self.execute()
def test_querying_table(metadata):
"""
Create an object for test table.
"""
# When using pytest-xdist, we don't want concurrent table creations
# across test processes so we assign a unique name for table based on
# the current worker id.
worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
return Table(
'test_querying_table_' + worker_id, metadata,
Column('id', types.Integer, autoincrement=True, primary_key=True),
Column('t_string', types.String(60)),
Column('t_list', types.ARRAY(types.String(60))),
Column('t_enum', types.Enum(MyEnum)),
Column('t_int_enum', types.Enum(MyIntEnum)),
Column('t_datetime', types.DateTime()),
Column('t_date', types.DateTime()),
Column('t_interval', types.Interval()),
Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
)
def visit_column(self, delta):
table = delta.table
colspec = self.get_column_specification(delta.result_column)
if delta.result_column.autoincrement:
primary_keys = [c for c in table.primary_key.columns
if (c.autoincrement and
isinstance(c.type, sqltypes.Integer) and
not c.foreign_keys)]
if primary_keys:
first = primary_keys.pop(0)
if first.name == delta.current_name:
colspec += " AUTO_INCREMENT"
q = util.safe_quote(table)
old_col_name = self.preparer.quote(delta.current_name, q)
self.start_alter_table(table)
self.append("CHANGE COLUMN %s " % old_col_name)
self.append(colspec)
self.execute()
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
def compare_server_default(self, inspector_column,
metadata_column,
rendered_metadata_default,
rendered_inspector_default):
# partially a workaround for SQLAlchemy issue #3023; if the
# column were created without "NOT NULL", MySQL may have added
# an implicit default of '0' which we need to skip
if metadata_column.type._type_affinity is sqltypes.Integer and \
inspector_column.primary_key and \
not inspector_column.autoincrement and \
not rendered_metadata_default and \
rendered_inspector_default == "'0'":
return False
else:
return rendered_inspector_default != rendered_metadata_default
def check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, self.metadata(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck
def get_column_default(self, column):
if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
c = self.proxy("select nextval('%s_%s_seq')" % (column.table.name, column.name))
return c.fetchone()[0]
else:
return ansisql.ANSIDefaultRunner.get_column_default(self, column)
def _oid_col(self):
# OID remains a little hackish so far
if not hasattr(self, '_oid_column'):
if self.table.engine.oid_column_name() is not None:
self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True)
self._oid_column._set_parent(self.table)
else:
self._oid_column = None
return self._oid_column
def tag(name=None):
sort_by = request.args.get('sort_by')
if sort_by == 'play_count':
sort_by = [Artist.playcount.desc()]
elif sort_by == 'local_play_count':
sort_by = [Artist.local_playcount.desc()]
elif sort_by == 'tag_strength':
sort_by = [desc('strength')]
else:
sort_by = [desc('strength'), Artist.local_playcount.desc()]
name = name.lower()
top_artists = (
db.session.query(Artist.name, Artist.tags[name].cast(Integer).label('strength'), Artist.local_playcount, Artist.playcount)
.filter(Artist.tags.has_key(name))
.order_by(*sort_by)
.all()
)
top_artists = enumerate(top_artists, start=1)
return render_template(
'meta/tag.html',
tag=tag,
top_artists=top_artists,
)
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
def visit_typeclause(self, typeclause):
type_ = typeclause.type.dialect_impl(self.dialect)
if isinstance(type_, sqltypes.Integer):
return 'INTEGER'
else:
return super(DrizzleCompiler, self).visit_typeclause(typeclause)
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
def _check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, sa_schema.MetaData(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck
def bulk_insert(self, table, rows):
"""Issue a "bulk insert" operation using the current
migration context.
This provides a means of representing an INSERT of multiple rows
which works equally well in the context of executing on a live
connection as well as that of generating a SQL script. In the
case of a SQL script, the values are rendered inline into the
statement.
e.g.::
from alembic import op
from datetime import date
from sqlalchemy.sql import table, column
from sqlalchemy import String, Integer, Date
# Create an ad-hoc table to use for the insert statement.
accounts_table = table('account',
column('id', Integer),
column('name', String),
column('create_date', Date)
)
op.bulk_insert(accounts_table,
[
{'id':1, 'name':'John Smith',
'create_date':date(2010, 10, 5)},
{'id':2, 'name':'Ed Williams',
'create_date':date(2007, 5, 27)},
{'id':3, 'name':'Wendy Jones',
'create_date':date(2008, 8, 15)},
]
)
"""
self.impl.bulk_insert(table, rows)
def visit_typeclause(self, typeclause):
type_ = typeclause.type.dialect_impl(self.dialect)
if isinstance(type_, sqltypes.Integer):
return 'INTEGER'
else:
return super(DrizzleCompiler, self).visit_typeclause(typeclause)
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
def compare_server_default(self, inspector_column,
metadata_column,
rendered_metadata_default,
rendered_inspector_default):
# partially a workaround for SQLAlchemy issue #3023; if the
# column were created without "NOT NULL", MySQL may have added
# an implicit default of '0' which we need to skip
if metadata_column.type._type_affinity is sqltypes.Integer and \
inspector_column.primary_key and \
not inspector_column.autoincrement and \
not rendered_metadata_default and \
rendered_inspector_default == "'0'":
return False
else:
return rendered_inspector_default != rendered_metadata_default
def check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, self.metadata(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck