def _render_server_default_for_compare(metadata_default,
metadata_col, autogen_context):
rendered = _user_defined_render(
"server_default", metadata_default, autogen_context)
if rendered is not False:
return rendered
if isinstance(metadata_default, sa_schema.DefaultClause):
if isinstance(metadata_default.arg, compat.string_types):
metadata_default = metadata_default.arg
else:
metadata_default = str(metadata_default.arg.compile(
dialect=autogen_context.dialect))
if isinstance(metadata_default, compat.string_types):
if metadata_col.type._type_affinity is sqltypes.String:
metadata_default = re.sub(r"^'|'$", "", metadata_default)
return repr(metadata_default)
else:
return metadata_default
else:
return None
python类String()的实例源码
def url(self):
return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)
# class ItemCandidateTag(Base):
# __tablename__ = 'item_candidate_tag'
# __table_args__ = (
# ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
# [ItemCandidate.item_id,
# ItemCandidate.osm_id,
# ItemCandidate.osm_type]),
# )
#
# item_id = Column(Integer, primary_key=True)
# osm_id = Column(BigInteger, primary_key=True)
# osm_type = Column(osm_type_enum, primary_key=True)
# k = Column(String, primary_key=True)
# v = Column(String, primary_key=True)
#
# item_candidate = relationship(ItemCandidate,
# backref=backref('tag_table', lazy='dynamic'))
def compare_server_default(self, inspector_column,
metadata_column,
rendered_metadata_default,
rendered_inspector_default):
# don't do defaults for SERIAL columns
if metadata_column.primary_key and \
metadata_column is metadata_column.table._autoincrement_column:
return False
conn_col_default = rendered_inspector_default
if None in (conn_col_default, rendered_metadata_default):
return conn_col_default != rendered_metadata_default
if metadata_column.type._type_affinity is not sqltypes.String:
rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)
return not self.connection.scalar(
"SELECT %s = %s" % (
conn_col_default,
rendered_metadata_default
)
)
def _render_server_default_for_compare(metadata_default,
metadata_col, autogen_context):
rendered = _user_defined_render(
"server_default", metadata_default, autogen_context)
if rendered is not False:
return rendered
if isinstance(metadata_default, sa_schema.DefaultClause):
if isinstance(metadata_default.arg, compat.string_types):
metadata_default = metadata_default.arg
else:
metadata_default = str(metadata_default.arg.compile(
dialect=autogen_context.dialect))
if isinstance(metadata_default, compat.string_types):
if metadata_col.type._type_affinity is sqltypes.String:
metadata_default = re.sub(r"^'|'$", "", metadata_default)
return repr(metadata_default)
else:
return metadata_default
else:
return None
def compare_server_default(self, inspector_column,
metadata_column,
rendered_metadata_default,
rendered_inspector_default):
# don't do defaults for SERIAL columns
if metadata_column.primary_key and \
metadata_column is metadata_column.table._autoincrement_column:
return False
conn_col_default = rendered_inspector_default
if None in (conn_col_default, rendered_metadata_default):
return conn_col_default != rendered_metadata_default
if metadata_column.type._type_affinity is not sqltypes.String:
rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)
return not self.connection.scalar(
"SELECT %s = %s" % (
conn_col_default,
rendered_metadata_default
)
)
def test_varchar_reflection(self):
typ = self._type_round_trip(sql_types.String(52))[0]
assert isinstance(typ, sql_types.String)
eq_(typ.length, 52)
def _render_server_default_for_compare(metadata_default,
metadata_col, autogen_context):
rendered = _user_defined_render(
"server_default", metadata_default, autogen_context)
if rendered is not False:
return rendered
if isinstance(metadata_default, sa_schema.DefaultClause):
if isinstance(metadata_default.arg, compat.string_types):
metadata_default = metadata_default.arg
else:
metadata_default = str(metadata_default.arg.compile(
dialect=autogen_context['dialect']))
if isinstance(metadata_default, compat.string_types):
if metadata_col.type._type_affinity is sqltypes.String:
metadata_default = re.sub(r"^'|'$", "", metadata_default)
return repr(metadata_default)
else:
return metadata_default
else:
return None
def test_querying_table(metadata):
"""
Create an object for test table.
"""
# When using pytest-xdist, we don't want concurrent table creations
# across test processes so we assign a unique name for table based on
# the current worker id.
worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
return Table(
'test_querying_table_' + worker_id, metadata,
Column('id', types.Integer, autoincrement=True, primary_key=True),
Column('t_string', types.String(60)),
Column('t_list', types.ARRAY(types.String(60))),
Column('t_enum', types.Enum(MyEnum)),
Column('t_int_enum', types.Enum(MyIntEnum)),
Column('t_datetime', types.DateTime()),
Column('t_date', types.DateTime()),
Column('t_interval', types.Interval()),
Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
)
def __init__(self, type_in=None, key=None, engine=None, **kwargs):
"""Initialization."""
if not cryptography:
raise ImproperlyConfigured(
"'cryptography' is required to use EncryptedType"
)
super(EncryptedType, self).__init__(**kwargs)
# set the underlying type
if type_in is None:
type_in = String()
elif isinstance(type_in, type):
type_in = type_in()
self.underlying_type = type_in
self._key = key
if not engine:
engine = AesEngine
self.engine = engine()
def bind_processor(self, dialect):
if dialect._cx_oracle_with_unicode:
def process(value):
if value is None:
return value
else:
return unicode(value)
return process
else:
return super(
_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
# will be set for the default String.result_processor.
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def assert_tables_equal(self, table, reflected_table, strict_types=False):
assert len(table.c) == len(reflected_table.c)
for c, reflected_c in zip(table.c, reflected_table.c):
eq_(c.name, reflected_c.name)
assert reflected_c is reflected_table.c[c.name]
eq_(c.primary_key, reflected_c.primary_key)
eq_(c.nullable, reflected_c.nullable)
if strict_types:
msg = "Type '%s' doesn't correspond to type '%s'"
assert isinstance(reflected_c.type, type(c.type)), \
msg % (reflected_c.type, c.type)
else:
self.assert_types_base(reflected_c, c)
if isinstance(c.type, sqltypes.String):
eq_(c.type.length, reflected_c.type.length)
eq_(
set([f.column.name for f in c.foreign_keys]),
set([f.column.name for f in reflected_c.foreign_keys])
)
if c.server_default:
assert isinstance(reflected_c.server_default,
schema.FetchedValue)
assert len(table.primary_key) == len(reflected_table.primary_key)
for c in table.primary_key:
assert reflected_table.primary_key.columns[c.name] is not None
def with_variant(self, type_, dialect_name):
"""Produce a new type object that will utilize the given
type when applied to the dialect of the given name.
e.g.::
from sqlalchemy.types import String
from sqlalchemy.dialects import mysql
s = String()
s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')
The construction of :meth:`.TypeEngine.with_variant` is always
from the "fallback" type to that which is dialect specific.
The returned type is an instance of :class:`.Variant`, which
itself provides a :meth:`.Variant.with_variant`
that can be called repeatedly.
:param type_: a :class:`.TypeEngine` that will be selected
as a variant from the originating type, when a dialect
of the given name is in use.
:param dialect_name: base name of the dialect which uses
this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)
.. versionadded:: 0.7.2
"""
return Variant(self, {dialect_name: to_instance(type_)})
def binary_operator_string(self, binary):
if isinstance(binary.type, sqltypes.String) and binary.operator == '+':
return '||'
else:
return ansisql.ANSICompiler.binary_operator_string(self, binary)
def binary_operator_string(self, binary):
if isinstance(binary.type, sqltypes.String) and binary.operator == '+':
return '||'
else:
return ansisql.ANSICompiler.binary_operator_string(self, binary)
def bind_processor(self, dialect):
if dialect._cx_oracle_with_unicode:
def process(value):
if value is None:
return value
else:
return unicode(value)
return process
else:
return super(
_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
# will be set for the default String.result_processor.
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def assert_tables_equal(self, table, reflected_table, strict_types=False):
assert len(table.c) == len(reflected_table.c)
for c, reflected_c in zip(table.c, reflected_table.c):
eq_(c.name, reflected_c.name)
assert reflected_c is reflected_table.c[c.name]
eq_(c.primary_key, reflected_c.primary_key)
eq_(c.nullable, reflected_c.nullable)
if strict_types:
msg = "Type '%s' doesn't correspond to type '%s'"
assert isinstance(reflected_c.type, type(c.type)), \
msg % (reflected_c.type, c.type)
else:
self.assert_types_base(reflected_c, c)
if isinstance(c.type, sqltypes.String):
eq_(c.type.length, reflected_c.type.length)
eq_(
set([f.column.name for f in c.foreign_keys]),
set([f.column.name for f in reflected_c.foreign_keys])
)
if c.server_default:
assert isinstance(reflected_c.server_default,
schema.FetchedValue)
assert len(table.primary_key) == len(reflected_table.primary_key)
for c in table.primary_key:
assert reflected_table.primary_key.columns[c.name] is not None
def with_variant(self, type_, dialect_name):
"""Produce a new type object that will utilize the given
type when applied to the dialect of the given name.
e.g.::
from sqlalchemy.types import String
from sqlalchemy.dialects import mysql
s = String()
s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')
The construction of :meth:`.TypeEngine.with_variant` is always
from the "fallback" type to that which is dialect specific.
The returned type is an instance of :class:`.Variant`, which
itself provides a :meth:`.Variant.with_variant`
that can be called repeatedly.
:param type_: a :class:`.TypeEngine` that will be selected
as a variant from the originating type, when a dialect
of the given name is in use.
:param dialect_name: base name of the dialect which uses
this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)
.. versionadded:: 0.7.2
"""
return Variant(self, {dialect_name: to_instance(type_)})
def bind_processor(self, dialect):
if dialect._cx_oracle_with_unicode:
def process(value):
if value is None:
return value
else:
return unicode(value)
return process
else:
return super(
_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
# will be set for the default String.result_processor.
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def _test_get_unique_constraints(self, schema=None):
uniques = sorted(
[
{'name': 'unique_a', 'column_names': ['a']},
{'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
{'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
{'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
],
key=operator.itemgetter('name')
)
orig_meta = self.metadata
table = Table(
'testtbl', orig_meta,
Column('a', sa.String(20)),
Column('b', sa.String(30)),
Column('c', sa.Integer),
# reserved identifiers
Column('asc', sa.String(30)),
Column('key', sa.String(30)),
schema=schema
)
for uc in uniques:
table.append_constraint(
sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
)
orig_meta.create_all()
inspector = inspect(orig_meta.bind)
reflected = sorted(
inspector.get_unique_constraints('testtbl', schema=schema),
key=operator.itemgetter('name')
)
for orig, refl in zip(uniques, reflected):
eq_(orig, refl)
def assert_tables_equal(self, table, reflected_table, strict_types=False):
assert len(table.c) == len(reflected_table.c)
for c, reflected_c in zip(table.c, reflected_table.c):
eq_(c.name, reflected_c.name)
assert reflected_c is reflected_table.c[c.name]
eq_(c.primary_key, reflected_c.primary_key)
eq_(c.nullable, reflected_c.nullable)
if strict_types:
msg = "Type '%s' doesn't correspond to type '%s'"
assert isinstance(reflected_c.type, type(c.type)), \
msg % (reflected_c.type, c.type)
else:
self.assert_types_base(reflected_c, c)
if isinstance(c.type, sqltypes.String):
eq_(c.type.length, reflected_c.type.length)
eq_(
set([f.column.name for f in c.foreign_keys]),
set([f.column.name for f in reflected_c.foreign_keys])
)
if c.server_default:
assert isinstance(reflected_c.server_default,
schema.FetchedValue)
assert len(table.primary_key) == len(reflected_table.primary_key)
for c in table.primary_key:
assert reflected_table.primary_key.columns[c.name] is not None
def with_variant(self, type_, dialect_name):
"""Produce a new type object that will utilize the given
type when applied to the dialect of the given name.
e.g.::
from sqlalchemy.types import String
from sqlalchemy.dialects import mysql
s = String()
s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')
The construction of :meth:`.TypeEngine.with_variant` is always
from the "fallback" type to that which is dialect specific.
The returned type is an instance of :class:`.Variant`, which
itself provides a :meth:`.Variant.with_variant`
that can be called repeatedly.
:param type_: a :class:`.TypeEngine` that will be selected
as a variant from the originating type, when a dialect
of the given name is in use.
:param dialect_name: base name of the dialect which uses
this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)
.. versionadded:: 0.7.2
"""
return Variant(self, {dialect_name: to_instance(type_)})
def bind_processor(self, dialect):
if dialect._cx_oracle_with_unicode:
def process(value):
if value is None:
return value
else:
return unicode(value)
return process
else:
return super(
_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
# will be set for the default String.result_processor.
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def _test_get_unique_constraints(self, schema=None):
uniques = sorted(
[
{'name': 'unique_a', 'column_names': ['a']},
{'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
{'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
{'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
],
key=operator.itemgetter('name')
)
orig_meta = self.metadata
table = Table(
'testtbl', orig_meta,
Column('a', sa.String(20)),
Column('b', sa.String(30)),
Column('c', sa.Integer),
# reserved identifiers
Column('asc', sa.String(30)),
Column('key', sa.String(30)),
schema=schema
)
for uc in uniques:
table.append_constraint(
sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
)
orig_meta.create_all()
inspector = inspect(orig_meta.bind)
reflected = sorted(
inspector.get_unique_constraints('testtbl', schema=schema),
key=operator.itemgetter('name')
)
for orig, refl in zip(uniques, reflected):
eq_(orig, refl)
def assert_tables_equal(self, table, reflected_table, strict_types=False):
assert len(table.c) == len(reflected_table.c)
for c, reflected_c in zip(table.c, reflected_table.c):
eq_(c.name, reflected_c.name)
assert reflected_c is reflected_table.c[c.name]
eq_(c.primary_key, reflected_c.primary_key)
eq_(c.nullable, reflected_c.nullable)
if strict_types:
msg = "Type '%s' doesn't correspond to type '%s'"
assert isinstance(reflected_c.type, type(c.type)), \
msg % (reflected_c.type, c.type)
else:
self.assert_types_base(reflected_c, c)
if isinstance(c.type, sqltypes.String):
eq_(c.type.length, reflected_c.type.length)
eq_(
set([f.column.name for f in c.foreign_keys]),
set([f.column.name for f in reflected_c.foreign_keys])
)
if c.server_default:
assert isinstance(reflected_c.server_default,
schema.FetchedValue)
assert len(table.primary_key) == len(reflected_table.primary_key)
for c in table.primary_key:
assert reflected_table.primary_key.columns[c.name] is not None
def with_variant(self, type_, dialect_name):
"""Produce a new type object that will utilize the given
type when applied to the dialect of the given name.
e.g.::
from sqlalchemy.types import String
from sqlalchemy.dialects import mysql
s = String()
s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')
The construction of :meth:`.TypeEngine.with_variant` is always
from the "fallback" type to that which is dialect specific.
The returned type is an instance of :class:`.Variant`, which
itself provides a :meth:`~sqlalchemy.types.Variant.with_variant`
that can be called repeatedly.
:param type_: a :class:`.TypeEngine` that will be selected
as a variant from the originating type, when a dialect
of the given name is in use.
:param dialect_name: base name of the dialect which uses
this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)
.. versionadded:: 0.7.2
"""
return Variant(self, {dialect_name: to_instance(type_)})
def bind_processor(self, dialect):
if dialect._cx_oracle_with_unicode:
def process(value):
if value is None:
return value
else:
return unicode(value)
return process
else:
return super(
_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
# will be set for the default String.result_processor.