def test_enum_detection(self):
Table(
'simple_items', self.metadata,
Column('enum', VARCHAR(255)),
CheckConstraint(r"simple_items.enum IN ('A', '\'B', 'C')")
)
assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, Enum, MetaData, Table
metadata = MetaData()
t_simple_items = Table(
'simple_items', metadata,
Column('enum', Enum('A', "\\\\'B", 'C'))
)
"""
python类VARCHAR的实例源码
def test_mysql_column_types(self):
Table(
'simple_items', self.metadata,
Column('id', mysql.INTEGER),
Column('name', mysql.VARCHAR(255))
)
assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, Integer, MetaData, String, Table
metadata = MetaData()
t_simple_items = Table(
'simple_items', metadata,
Column('id', Integer),
Column('name', String(255))
)
"""
def test_foreign_key_options(self):
Table(
'simple_items', self.metadata,
Column('name', VARCHAR, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE',
deferrable=True, initially='DEFERRED'))
)
assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, ForeignKey, MetaData, String, Table
metadata = MetaData()
t_simple_items = Table(
'simple_items', metadata,
Column('name', String, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', \
deferrable=True, initially='DEFERRED'))
)
"""
def load_dialect_impl(self, dialect: dialects) -> DialectType:
"""
SQLAlchemy wraps all database-specific features into
dialects, which are then responsible for generating the SQL code
for a specific DB type when loading in data. ``load_dialect_impl``
is called when CRUD (create, update, delete operations) needs to be
done on the database. This method is responsible for telling
SQLAlchemy how to configure the dialect to write this type
:param dialect: The loaded dialect
:return: The type descriptor for this type.
"""
if dialect.name == 'postgresql':
return dialect.type_descriptor(postgresql.JSON())
elif dialect.name == 'mysql':
if 'JSON' in dialect.ischema_names:
return dialect.type_descriptor(mysql.JSON())
else:
return dialect.type_descriptor(
VARCHAR(self._MAX_VARCHAR_LIMIT)
)
else:
return dialect.type_descriptor(VARCHAR(self._MAX_VARCHAR_LIMIT))
def str_to_sqltype(expr):
import re
import sqlalchemy.types as sqltypes
norm_expr = expr.lower()
if norm_expr.startswith('integer'):
match_result = re.match(r'integer\((\d+)\)', norm_expr)
if match_result is not None:
return sqltypes.BIGINT() if int(match_result.group(1)) > 11 else sqltypes.INTEGER()
return sqltypes.BIGINT()
if norm_expr == 'decimal':
return sqltypes.DECIMAL()
if norm_expr == 'date':
return sqltypes.DATETIME()
if norm_expr == 'bool' or norm_expr == 'boolean':
return sqltypes.BOOLEAN()
if norm_expr.startswith('string'):
match_result = re.match(r'string\((\d+)\)', norm_expr)
if match_result is not None:
maxlen = int(match_result.group(1))
return sqltypes.VARCHAR(maxlen) if maxlen < 65536 else sqltypes.TEXT
return sqltypes.TEXT()
raise RuntimeError("Unsupported data type [" + expr + "]")
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
"""Construct a VARCHAR.
:param collation: Optional, a column-level collation for this string
value. Takes precedence to 'binary' short-hand.
:param binary: Defaults to False: short-hand, pick the binary
collation type that matches the column's character set. Generates
BINARY in schema. This does not affect the type of data stored,
only the collation of character data.
"""
super(VARCHAR, self).__init__(length=length, **kwargs)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
"""Construct a VARCHAR.
:param collation: Optional, a column-level collation for this string
value. Takes precedence to 'binary' short-hand.
:param binary: Defaults to False: short-hand, pick the binary
collation type that matches the column's character set. Generates
BINARY in schema. This does not affect the type of data stored,
only the collation of character data.
"""
super(VARCHAR, self).__init__(length=length, **kwargs)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def _get_column_info(self, name, type_, nullable,
default, primary_key):
match = re.match(r'(\w+)(\(.*?\))?', type_)
if match:
coltype = match.group(1)
args = match.group(2)
else:
coltype = "VARCHAR"
args = ''
try:
coltype = self.ischema_names[coltype]
if args is not None:
args = re.findall(r'(\d+)', args)
coltype = coltype(*[int(a) for a in args])
except KeyError:
util.warn("Did not recognize type '%s' of column '%s'" %
(coltype, name))
coltype = sqltypes.NullType()
if default is not None:
default = unicode(default)
return {
'name': name,
'type': coltype,
'nullable': nullable,
'default': default,
'autoincrement': default is None,
'primary_key': primary_key
}
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
"""Construct a VARCHAR.
:param collation: Optional, a column-level collation for this string
value. Takes precedence to 'binary' short-hand.
:param binary: Defaults to False: short-hand, pick the binary
collation type that matches the column's character set. Generates
BINARY in schema. This does not affect the type of data stored,
only the collation of character data.
"""
super(VARCHAR, self).__init__(length=length, **kwargs)
def test_indexes_class(self):
simple_items = Table(
'simple_items', self.metadata,
Column('id', INTEGER, primary_key=True),
Column('number', INTEGER),
Column('text', VARCHAR)
)
simple_items.indexes.add(Index('idx_number', simple_items.c.number))
simple_items.indexes.add(Index('idx_text_number', simple_items.c.text, simple_items.c.number))
simple_items.indexes.add(Index('idx_text', simple_items.c.text, unique=True))
assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, Index, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class SimpleItem(Base):
__tablename__ = 'simple_items'
__table_args__ = (
Index('idx_text_number', 'text', 'number'),
)
id = Column(Integer, primary_key=True)
number = Column(Integer, index=True)
text = Column(String, unique=True)
"""
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)