def as_mutable(cls, orig_sqltype):
"""Mark the value as nested mutable value.
What happens here
* We coerce the return value - the type value set on sqlalchemy.Column() to the underlying SQL typ
* We mark this type value with a marker attribute
* Then we set a global SQAlchemy mapper event handler
* When mapper is done setting up our model classes, it will call the event handler for all models
* We check if any of the models columns have our marked type value as the value
* If so we call ``associate_with_attribute`` for this model and column that sets up ``MutableBase._listen_on_attribute`` event handlers. These event handlers take care of taking the raw dict coming out from database and wrapping it to NestedMutableDict.
:param orig_sqltype: Usually websauna.system.model.column.JSONB instance
:return: Marked and coerced type value
"""
# Create an instance of this type and add a marker attribute,
# so we later find it.
# We cannot directly compare the result type values, as looks like
# the type value might be mangled by dialect specific implementations
# or lost somewhere. Never figured this out 100%.
sqltype = types.to_instance(orig_sqltype)
sqltype._column_value_id = id(sqltype)
def listen_for_type(mapper, class_):
for prop in mapper.column_attrs:
# The original implementation has SQLAlchemy type comparator.
# Here we need to be little more complex, because we define a type alias
# for generic JSONB implementation
if getattr(prop.columns[0].type, "_column_value_id", None) == sqltype._column_value_id:
cls.associate_with_attribute(getattr(class_, prop.key))
event.listen(mapper, 'mapper_configured', listen_for_type)
return sqltype
python类to_instance()的实例源码
def __init__(self, name, column_name, type_, **kw):
using = kw.pop('using', None)
super(PostgresqlColumnType, self).__init__(name, column_name, **kw)
self.type_ = sqltypes.to_instance(type_)
self.using = using
def __init__(self, name, column_name, schema=None,
existing_type=None,
existing_nullable=None,
existing_server_default=None):
super(AlterColumn, self).__init__(name, schema=schema)
self.column_name = column_name
self.existing_type = sqltypes.to_instance(existing_type) \
if existing_type is not None else None
self.existing_nullable = existing_nullable
self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw):
super(ColumnType, self).__init__(name, column_name,
**kw)
self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, type_, **kw):
using = kw.pop('using', None)
super(PostgresqlColumnType, self).__init__(name, column_name, **kw)
self.type_ = sqltypes.to_instance(type_)
self.using = using
def __init__(self, name, column_name, schema=None,
existing_type=None,
existing_nullable=None,
existing_server_default=None):
super(AlterColumn, self).__init__(name, schema=schema)
self.column_name = column_name
self.existing_type = sqltypes.to_instance(existing_type) \
if existing_type is not None else None
self.existing_nullable = existing_nullable
self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw):
super(ColumnType, self).__init__(name, column_name,
**kw)
self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, type_, **kw):
using = kw.pop('using', None)
super(PostgresqlColumnType, self).__init__(name, column_name, **kw)
self.type_ = sqltypes.to_instance(type_)
self.using = using
def __init__(self, name, column_name, schema=None,
existing_type=None,
existing_nullable=None,
existing_server_default=None):
super(AlterColumn, self).__init__(name, schema=schema)
self.column_name = column_name
self.existing_type = sqltypes.to_instance(existing_type) \
if existing_type is not None else None
self.existing_nullable = existing_nullable
self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw):
super(ColumnType, self).__init__(name, column_name,
**kw)
self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, type_, **kw):
using = kw.pop('using', None)
super(PostgresqlColumnType, self).__init__(name, column_name, **kw)
self.type_ = sqltypes.to_instance(type_)
self.using = using
def __init__(self, name, column_name, schema=None,
existing_type=None,
existing_nullable=None,
existing_server_default=None):
super(AlterColumn, self).__init__(name, schema=schema)
self.column_name = column_name
self.existing_type = sqltypes.to_instance(existing_type) \
if existing_type is not None else None
self.existing_nullable = existing_nullable
self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw):
super(ColumnType, self).__init__(name, column_name,
**kw)
self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, schema=None,
existing_type=None,
existing_nullable=None,
existing_server_default=None):
super(AlterColumn, self).__init__(name, schema=schema)
self.column_name = column_name
self.existing_type = sqltypes.to_instance(existing_type) \
if existing_type is not None else None
self.existing_nullable = existing_nullable
self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw):
super(ColumnType, self).__init__(name, column_name,
**kw)
self.type_ = sqltypes.to_instance(type_)
def alter_column(self, table_name, column_name,
nullable=None,
server_default=False,
name=None,
type_=None,
autoincrement=None,
**kw
):
existing = self.columns[column_name]
existing_transfer = self.column_transfers[column_name]
if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
existing.name = name
existing_transfer["name"] = name
if type_ is not None:
type_ = sqltypes.to_instance(type_)
# old type is being discarded so turn off eventing
# rules. Alternatively we can
# erase the events set up by this type, but this is simpler.
# we also ignore the drop_constraint that will come here from
# Operations.implementation_for(alter_column)
if isinstance(existing.type, SchemaEventTarget):
existing.type._create_events = \
existing.type.create_constraint = False
existing.type = type_
# we *dont* however set events for the new type, because
# alter_column is invoked from
# Operations.implementation_for(alter_column) which already
# will emit an add_constraint()
existing_transfer["expr"] = cast(existing_transfer["expr"], type_)
if nullable is not None:
existing.nullable = nullable
if server_default is not False:
if server_default is None:
existing.server_default = None
else:
sql_schema.DefaultClause(server_default)._set_parent(existing)
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)
def alter_column(self, table_name, column_name,
nullable=None,
server_default=False,
name=None,
type_=None,
autoincrement=None,
**kw
):
existing = self.columns[column_name]
existing_transfer = self.column_transfers[column_name]
if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
existing.name = name
existing_transfer["name"] = name
if type_ is not None:
type_ = sqltypes.to_instance(type_)
# old type is being discarded so turn off eventing
# rules. Alternatively we can
# erase the events set up by this type, but this is simpler.
# we also ignore the drop_constraint that will come here from
# Operations.implementation_for(alter_column)
if isinstance(existing.type, SchemaEventTarget):
existing.type._create_events = \
existing.type.create_constraint = False
if existing.type._type_affinity is not type_._type_affinity:
existing_transfer["expr"] = cast(
existing_transfer["expr"], type_)
existing.type = type_
# we *dont* however set events for the new type, because
# alter_column is invoked from
# Operations.implementation_for(alter_column) which already
# will emit an add_constraint()
if nullable is not None:
existing.nullable = nullable
if server_default is not False:
if server_default is None:
existing.server_default = None
else:
sql_schema.DefaultClause(server_default)._set_parent(existing)
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)
def alter_column(self, table_name, column_name,
nullable=None,
server_default=False,
name=None,
type_=None,
autoincrement=None,
**kw
):
existing = self.columns[column_name]
existing_transfer = self.column_transfers[column_name]
if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
existing.name = name
existing_transfer["name"] = name
if type_ is not None:
type_ = sqltypes.to_instance(type_)
# old type is being discarded so turn off eventing
# rules. Alternatively we can
# erase the events set up by this type, but this is simpler.
# we also ignore the drop_constraint that will come here from
# Operations.implementation_for(alter_column)
if isinstance(existing.type, SchemaEventTarget):
existing.type._create_events = \
existing.type.create_constraint = False
existing.type = type_
# we *dont* however set events for the new type, because
# alter_column is invoked from
# Operations.implementation_for(alter_column) which already
# will emit an add_constraint()
existing_transfer["expr"] = cast(existing_transfer["expr"], type_)
if nullable is not None:
existing.nullable = nullable
if server_default is not False:
if server_default is None:
existing.server_default = None
else:
sql_schema.DefaultClause(server_default)._set_parent(existing)
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)
sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None, dtype=None):
"""
Write records stored in a DataFrame to a SQL database.
Parameters
----------
frame : DataFrame
name : string
Name of SQL table
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.
dtype : dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should
be a SQLAlchemy type.
"""
if dtype is not None:
from sqlalchemy.types import to_instance, TypeEngine
for col, my_type in dtype.items():
if not isinstance(to_instance(my_type), TypeEngine):
raise ValueError('The type of %s is not a SQLAlchemy '
'type ' % col)
table = SQLTable(name, self, frame=frame, index=index,
if_exists=if_exists, index_label=index_label,
schema=schema, dtype=dtype)
table.create()
table.insert(chunksize)
# check for potentially case sensitivity issues (GH7815)
engine = self.connectable.engine
with self.connectable.connect() as conn:
table_names = engine.table_names(
schema=schema or self.meta.schema,
connection=conn,
)
if name not in table_names:
warnings.warn("The provided table name '{0}' is not found exactly "
"as such in the database after writing the table, "
"possibly due to case sensitivity issues. Consider "
"using lower case table names.".format(name),
UserWarning)
def alter_column(self, table_name, column_name,
nullable=None,
server_default=False,
name=None,
type_=None,
autoincrement=None,
**kw
):
existing = self.columns[column_name]
existing_transfer = self.column_transfers[column_name]
if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
existing.name = name
existing_transfer["name"] = name
if type_ is not None:
type_ = sqltypes.to_instance(type_)
# old type is being discarded so turn off eventing
# rules. Alternatively we can
# erase the events set up by this type, but this is simpler.
# we also ignore the drop_constraint that will come here from
# Operations.implementation_for(alter_column)
if isinstance(existing.type, SchemaEventTarget):
existing.type._create_events = \
existing.type.create_constraint = False
if existing.type._type_affinity is not type_._type_affinity:
existing_transfer["expr"] = cast(
existing_transfer["expr"], type_)
existing.type = type_
# we *dont* however set events for the new type, because
# alter_column is invoked from
# Operations.implementation_for(alter_column) which already
# will emit an add_constraint()
if nullable is not None:
existing.nullable = nullable
if server_default is not False:
if server_default is None:
existing.server_default = None
else:
sql_schema.DefaultClause(server_default)._set_parent(existing)
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)