def create_lazy_clause(table, primaryjoin, secondaryjoin, foreignkey):
binds = {}
def visit_binary(binary):
circular = isinstance(binary.left, schema.Column) and isinstance(binary.right, schema.Column) and binary.left.table is binary.right.table
if isinstance(binary.left, schema.Column) and ((not circular and binary.left.table is table) or (circular and binary.right is foreignkey)):
binary.left = binds.setdefault(binary.left,
sql.BindParamClause(binary.right.table.name + "_" + binary.right.name, None, shortname = binary.left.name))
binary.swap()
if isinstance(binary.right, schema.Column) and ((not circular and binary.right.table is table) or (circular and binary.left is foreignkey)):
binary.right = binds.setdefault(binary.right,
sql.BindParamClause(binary.left.table.name + "_" + binary.left.name, None, shortname = binary.right.name))
if secondaryjoin is not None:
lazywhere = sql.and_(primaryjoin, secondaryjoin)
else:
lazywhere = primaryjoin
lazywhere = lazywhere.copy_container()
li = BinaryVisitor(visit_binary)
lazywhere.accept_visitor(li)
return (lazywhere, binds)
python类Column()的实例源码
def insert(table, values = None, **kwargs):
"""returns an INSERT clause element.
This can also be called from a table directly via the table's insert() method.
'table' is the table to be inserted into.
'values' is a dictionary which specifies the column specifications of the INSERT,
and is optional. If left as None, the column specifications are determined from the
bind parameters used during the compile phase of the INSERT statement. If the
bind parameters also are None during the compile phase, then the column
specifications will be generated from the full list of table columns.
If both 'values' and compile-time bind parameters are present, the compile-time
bind parameters override the information specified within 'values' on a per-key basis.
The keys within 'values' can be either Column objects or their string identifiers.
Each key may reference one of: a literal data value (i.e. string, number, etc.), a Column object,
or a SELECT statement. If a SELECT statement is specified which references this INSERT
statement's table, the statement will be correlated against the INSERT statement.
"""
return Insert(table, values, **kwargs)
def __init__(self, engine, statement, parameters):
"""constructs a new Compiled object.
engine - SQLEngine to compile against
statement - ClauseElement to be compiled
parameters - optional dictionary indicating a set of bind parameters
specified with this Compiled object. These parameters are the "default"
values corresponding to the ClauseElement's BindParamClauses when the Compiled
is executed. In the case of an INSERT or UPDATE statement, these parameters
will also result in the creation of new BindParamClause objects for each key
and will also affect the generated column list in an INSERT statement and the SET
clauses of an UPDATE statement. The keys of the parameter dictionary can
either be the string names of columns or actual sqlalchemy.schema.Column objects."""
self.engine = engine
self.parameters = parameters
self.statement = statement
def url(self):
return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)
# class ItemCandidateTag(Base):
# __tablename__ = 'item_candidate_tag'
# __table_args__ = (
# ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
# [ItemCandidate.item_id,
# ItemCandidate.osm_id,
# ItemCandidate.osm_type]),
# )
#
# item_id = Column(Integer, primary_key=True)
# osm_id = Column(BigInteger, primary_key=True)
# osm_type = Column(osm_type_enum, primary_key=True)
# k = Column(String, primary_key=True)
# v = Column(String, primary_key=True)
#
# item_candidate = relationship(ItemCandidate,
# backref=backref('tag_table', lazy='dynamic'))
def append_column(self, column):
"""Append a :class:`~.schema.Column` to this :class:`~.schema.Table`.
The "key" of the newly added :class:`~.schema.Column`, i.e. the
value of its ``.key`` attribute, will then be available
in the ``.c`` collection of this :class:`~.schema.Table`, and the
column definition will be included in any CREATE TABLE, SELECT,
UPDATE, etc. statements generated from this :class:`~.schema.Table`
construct.
Note that this does **not** change the definition of the table
as it exists within any underlying database, assuming that
table has already been created in the database. Relational
databases support the addition of columns to existing tables
using the SQL ALTER command, which would need to be
emitted for an already-existing table that doesn't contain
the newly added column.
"""
column._set_parent_with_dispatch(self)
def __repr__(self):
kwarg = []
if self.key != self.name:
kwarg.append('key')
if self.primary_key:
kwarg.append('primary_key')
if not self.nullable:
kwarg.append('nullable')
if self.onupdate:
kwarg.append('onupdate')
if self.default:
kwarg.append('default')
if self.server_default:
kwarg.append('server_default')
return "Column(%s)" % ', '.join(
[repr(self.name)] + [repr(self.type)] +
[repr(x) for x in self.foreign_keys if x is not None] +
[repr(x) for x in self.constraints] +
[(self.table is not None and "table=<%s>" %
self.table.description or "table=None")] +
["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg])
def matchIndex(self, name=None):
# Get index of correct column
incols = [x for x in self.cols if x in self.collist]
if not any(incols):
return None
elif len(incols) == 1:
idx = self.cols.index(incols[0])
else:
if not name:
print('Multiple columns found. Column name must be specified!')
return None
elif name in self.collist:
idx = self.cols.index(name)
else:
return None
return idx
def _get_index_rendered_expressions(idx, autogen_context):
if compat.sqla_08:
return [repr(_ident(getattr(exp, "name", None)))
if isinstance(exp, sa_schema.Column)
else _render_potential_expr(exp, autogen_context)
for exp in idx.expressions]
else:
return [
repr(_ident(getattr(col, "name", None))) for col in idx.columns]
def _find_columns(clause):
"""locate Column objects within the given expression."""
cols = set()
traverse(clause, {}, {'column': cols.add})
return cols
def _textual_index_column(table, text_):
"""a workaround for the Index construct's severe lack of flexibility"""
if isinstance(text_, compat.string_types):
c = Column(text_, sqltypes.NULLTYPE)
table.append_column(c)
return c
elif isinstance(text_, TextClause):
return _textual_index_element(table, text_)
else:
raise ValueError("String or text() construct expected")
def __init__(self, table, text):
self.table = table
self.text = text
self.key = text.text
self.fake_column = schema.Column(self.text.text, sqltypes.NULLTYPE)
table.append_column(self.fake_column)
def primary_key_constraint(self, name, table_name, cols, schema=None):
m = self.metadata()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t = sa_schema.Table(
table_name, m,
*columns,
schema=schema)
p = sa_schema.PrimaryKeyConstraint(
*[t.c[n] for n in cols], name=name)
t.append_constraint(p)
return p
def unique_constraint(self, name, source, local_cols, schema=None, **kw):
t = sa_schema.Table(
source, self.metadata(),
*[sa_schema.Column(n, NULLTYPE) for n in local_cols],
schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
# is fired off here
t.append_constraint(uq)
return uq
def check_constraint(self, name, source, condition, schema=None, **kw):
t = sa_schema.Table(source, self.metadata(),
sa_schema.Column('x', Integer), schema=schema)
ck = sa_schema.CheckConstraint(condition, name=name, **kw)
t.append_constraint(ck)
return ck
def column(self, name, type_, **kw):
return sa_schema.Column(name, type_, **kw)
def _ensure_table_for_fk(self, metadata, fk):
"""create a placeholder Table object for the referent of a
ForeignKey.
"""
if isinstance(fk._colspec, string_types):
table_key, cname = fk._colspec.rsplit('.', 1)
sname, tname = self._parse_table_key(table_key)
if table_key not in metadata.tables:
rel_t = sa_schema.Table(tname, metadata, schema=sname)
else:
rel_t = metadata.tables[table_key]
if cname not in rel_t.c:
rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
def columnimpl(self, column):
"""returns a new sql.ColumnImpl object to correspond to the given Column object.
A ColumnImpl provides SQL statement builder operations on a Column metadata object,
and a subclass of this object may be provided by a SQLEngine subclass to provide
database-specific behavior."""
return sql.ColumnImpl(column)
def _get_col(self, row, key):
if isinstance(key, schema.Column) or isinstance(key, sql.ColumnElement):
try:
rec = self.props[key._label.lower()]
except KeyError:
try:
rec = self.props[key.key.lower()]
except KeyError:
rec = self.props[key.name.lower()]
elif isinstance(key, str):
rec = self.props[key.lower()]
else:
rec = self.props[key]
return rec[0].convert_result_value(row[rec[1]], self.engine)
def reflecttable(self, table):
# to use information_schema:
#ischema.reflecttable(self, table, ischema_names, use_mysql=True)
tabletype, foreignkeyD = self.moretableinfo(table=table)
table._impl.mysql_engine = tabletype
c = self.execute("describe " + table.name, {})
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
(name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4])
match = re.match(r'(\w+)(\(.*?\))?', type)
coltype = match.group(1)
args = match.group(2)
#print "coltype: " + repr(coltype) + " args: " + repr(args)
coltype = ischema_names.get(coltype, MSString)
if args is not None:
args = re.findall(r'(\d+)', args)
#print "args! " +repr(args)
coltype = coltype(*[int(a) for a in args])
arglist = []
fkey = foreignkeyD.get(name)
if fkey is not None:
arglist.append(schema.ForeignKey(fkey))
table.append_item(schema.Column(name, coltype, *arglist,
**dict(primary_key=primary_key,
nullable=nullable,
default=default
)))
def _find_dependent(self):
"""searches through the primary join condition to determine which side
has the primary key and which has the foreign key - from this we return
the "foreign key" for this property which helps determine one-to-many/many-to-one."""
# set as a reference to allow assignment from inside a first-class function
dependent = [None]
def foo(binary):
if binary.operator != '=':
return
if isinstance(binary.left, schema.Column) and binary.left.primary_key:
if dependent[0] is binary.left.table:
raise "bidirectional dependency not supported...specify foreignkey"
dependent[0] = binary.right.table
self.foreignkey= binary.right
elif isinstance(binary.right, schema.Column) and binary.right.primary_key:
if dependent[0] is binary.right.table:
raise "bidirectional dependency not supported...specify foreignkey"
dependent[0] = binary.left.table
self.foreignkey = binary.left
visitor = BinaryVisitor(foo)
self.primaryjoin.accept_visitor(visitor)
if dependent[0] is None:
raise "cant determine primary foreign key in the join relationship....specify foreignkey=<column> or foreignkey=[<columns>]"
else:
self.foreigntable = dependent[0]
def visit_compound(self, compound):
for i in range(0, len(compound.clauses)):
if isinstance(compound.clauses[i], schema.Column) and self.tables.has_key(compound.clauses[i].table):
compound.clauses[i] = self.get_alias(compound.clauses[i].table)._get_col_by_original(compound.clauses[i])
self.match = True
def visit_binary(self, binary):
if isinstance(binary.left, schema.Column) and self.tables.has_key(binary.left.table):
binary.left = self.get_alias(binary.left.table)._get_col_by_original(binary.left)
self.match = True
if isinstance(binary.right, schema.Column) and self.tables.has_key(binary.right.table):
binary.right = self.get_alias(binary.right.table)._get_col_by_original(binary.right)
self.match = True
def bindparam(key, value = None, type=None):
"""creates a bind parameter clause with the given key.
An optional default value can be specified by the value parameter, and the optional type parameter
is a sqlalchemy.types.TypeEngine object which indicates bind-parameter and result-set translation for
this bind parameter."""
if isinstance(key, schema.Column):
return BindParamClause(key.name, value, type=key.type)
else:
return BindParamClause(key, value, type=type)
def is_column(col):
return isinstance(col, schema.Column) or isinstance(col, ColumnElement)
def is_selectable(self):
"""returns True if this ClauseElement is Selectable, i.e. it contains a list of Column
objects and can be used as the target of a select statement."""
return False
def _compare_self(self):
"""allows ColumnImpl to return its Column object for usage in ClauseElements, all others to
just return self"""
return self
def _get_col_by_original(self, column):
"""given a column which is a schema.Column object attached to a schema.Table object
(i.e. an "original" column), return the Column object from this
Selectable which corresponds to that original Column, or None if this Selectable
does not contain the column."""
return self.original_columns.get(column.original, None)
def compare(self, other):
"""compares this ColumnImpl's column to the other given Column"""
return self.column is other
def _compare_self(self):
"""allows ColumnImpl to return its Column object for usage in ClauseElements, all others to
just return self"""
return self.column
def _oid_col(self):
# OID remains a little hackish so far
if not hasattr(self, '_oid_column'):
if self.table.engine.oid_column_name() is not None:
self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True)
self._oid_column._set_parent(self.table)
else:
self._oid_column = None
return self._oid_column