def _primary_key_constraint(self, name, table_name, cols, schema=None):
m = sa_schema.MetaData()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t1 = sa_schema.Table(table_name, m,
*columns,
schema=schema)
p = sa_schema.PrimaryKeyConstraint(*columns, name=name)
t1.append_constraint(p)
return p
python类Column()的实例源码
def _foreign_key_constraint(self, name, source, referent,
local_cols, remote_cols,
onupdate=None, ondelete=None,
deferrable=None, source_schema=None,
referent_schema=None):
m = sa_schema.MetaData()
if source == referent:
t1_cols = local_cols + remote_cols
else:
t1_cols = local_cols
sa_schema.Table(referent, m,
*[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
schema=referent_schema)
t1 = sa_schema.Table(source, m,
*[sa_schema.Column(n, NULLTYPE) for n in t1_cols],
schema=source_schema)
tname = "%s.%s" % (referent_schema, referent) if referent_schema \
else referent
f = sa_schema.ForeignKeyConstraint(local_cols,
["%s.%s" % (tname, n)
for n in remote_cols],
name=name,
onupdate=onupdate,
ondelete=ondelete,
deferrable=deferrable
)
t1.append_constraint(f)
return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw):
t = sa_schema.Table(source, sa_schema.MetaData(),
*[sa_schema.Column(n, NULLTYPE) for n in local_cols],
schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
# is fired off here
t.append_constraint(uq)
return uq
def _check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, sa_schema.MetaData(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck
def _column(self, name, type_, **kw):
return sa_schema.Column(name, type_, **kw)
def _ensure_table_for_fk(self, metadata, fk):
"""create a placeholder Table object for the referent of a
ForeignKey.
"""
if isinstance(fk._colspec, string_types):
table_key, cname = fk._colspec.rsplit('.', 1)
sname, tname = self._parse_table_key(table_key)
if table_key not in metadata.tables:
rel_t = sa_schema.Table(tname, metadata, schema=sname)
else:
rel_t = metadata.tables[table_key]
if cname not in rel_t.c:
rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
def _compare_type(schema, tname, cname, conn_col,
metadata_col, diffs,
autogen_context):
conn_type = conn_col.type
metadata_type = metadata_col.type
if conn_type._type_affinity is sqltypes.NullType:
log.info("Couldn't determine database type "
"for column '%s.%s'", tname, cname)
return
if metadata_type._type_affinity is sqltypes.NullType:
log.info("Column '%s.%s' has no type within "
"the model; can't compare", tname, cname)
return
isdiff = autogen_context['context']._compare_type(conn_col, metadata_col)
if isdiff:
diffs.append(
("modify_type", schema, tname, cname,
{
"existing_nullable": conn_col.nullable,
"existing_server_default": conn_col.server_default,
},
conn_type,
metadata_type),
)
log.info("Detected type change from %r to %r on '%s.%s'",
conn_type, metadata_type, tname, cname
)
def _get_index_rendered_expressions(idx, autogen_context):
if compat.sqla_08:
return [repr(_ident(getattr(exp, "name", None)))
if isinstance(exp, sa_schema.Column)
else _render_potential_expr(exp, autogen_context)
for exp in idx.expressions]
else:
return [
repr(_ident(getattr(col, "name", None))) for col in idx.columns]
def _find_columns(clause):
"""locate Column objects within the given expression."""
cols = set()
traverse(clause, {}, {'column': cols.add})
return cols
def _textual_index_column(table, text_):
"""a workaround for the Index construct's severe lack of flexibility"""
if isinstance(text_, compat.string_types):
c = Column(text_, sqltypes.NULLTYPE)
table.append_column(c)
return c
elif isinstance(text_, TextClause):
return _textual_index_element(table, text_)
else:
raise ValueError("String or text() construct expected")
def __init__(self, table, text):
self.table = table
self.text = text
self.key = text.text
self.fake_column = schema.Column(self.text.text, sqltypes.NULLTYPE)
table.append_column(self.fake_column)
def primary_key_constraint(self, name, table_name, cols, schema=None):
m = self.metadata()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t = sa_schema.Table(
table_name, m,
*columns,
schema=schema)
p = sa_schema.PrimaryKeyConstraint(
*[t.c[n] for n in cols], name=name)
t.append_constraint(p)
return p
def unique_constraint(self, name, source, local_cols, schema=None, **kw):
t = sa_schema.Table(
source, self.metadata(),
*[sa_schema.Column(n, NULLTYPE) for n in local_cols],
schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
# is fired off here
t.append_constraint(uq)
return uq
def check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, self.metadata(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck
def column(self, name, type_, **kw):
return sa_schema.Column(name, type_, **kw)
def _ensure_table_for_fk(self, metadata, fk):
"""create a placeholder Table object for the referent of a
ForeignKey.
"""
if isinstance(fk._colspec, string_types):
table_key, cname = fk._colspec.rsplit('.', 1)
sname, tname = self._parse_table_key(table_key)
if table_key not in metadata.tables:
rel_t = sa_schema.Table(tname, metadata, schema=sname)
else:
rel_t = metadata.tables[table_key]
if cname not in rel_t.c:
rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
def _get_index_rendered_expressions(idx, autogen_context):
if util.sqla_08:
return [repr(_ident(getattr(exp, "name", None)))
if isinstance(exp, sa_schema.Column)
else _render_potential_expr(exp, autogen_context)
for exp in idx.expressions]
else:
return [
repr(_ident(getattr(col, "name", None))) for col in idx.columns]
def _find_columns(clause):
"""locate Column objects within the given expression."""
cols = set()
traverse(clause, {}, {'column': cols.add})
return cols
def _textual_index_column(table, text_):
"""a workaround for the Index construct's severe lack of flexibility"""
if isinstance(text_, compat.string_types):
c = Column(text_, sqltypes.NULLTYPE)
table.append_column(c)
return c
elif isinstance(text_, TextClause):
return _textual_index_element(table, text_)
else:
raise ValueError("String or text() construct expected")
def __init__(self, table, text):
self.table = table
self.text = text
self.key = text.text
self.fake_column = schema.Column(self.text.text, sqltypes.NULLTYPE)
table.append_column(self.fake_column)