def bind_processor(self, dialect):
datetime_date = datetime.date
format = self._storage_format
def process(value):
if value is None:
return None
elif isinstance(value, datetime_date):
return format % {
'year': value.year,
'month': value.month,
'day': value.day,
}
else:
raise TypeError("SQLite Date type only accepts Python "
"date objects as input.")
return process
python类Date()的实例源码
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def psql_map_type(t):
if isinstance(t,String):
return {'type':'string'}
elif isinstance(t,Numeric) or isinstance(t,Integer):
return {'type':'number'}
elif isinstance(t,Boolean):
return {'type':'boolean'}
elif isinstance(t,Date):
return {'type':'date'}
elif isinstance(t,Time):
return {'type':'time'}
elif isinstance(t,DateTime):
return {'type':'datetime'}
# Python operator and function signature tables
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
table_names = ['users', 'email_addresses']
if table_type == 'view':
table_names = ['users_v', 'email_addresses_v']
insp = inspect(meta.bind)
for table_name, table in zip(table_names, (users,
addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
# should be in order
for i, col in enumerate(table.columns):
eq_(col.name, cols[i]['name'])
ctype = cols[i]['type'].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
# Oracle returns Date for DateTime.
if testing.against('oracle') and ctype_def \
in (sql_types.Date, sql_types.DateTime):
ctype_def = sql_types.Date
# assert that the desired type and return type share
# a base within one of the generic types.
self.assert_(len(set(ctype.__mro__).
intersection(ctype_def.__mro__).
intersection([
sql_types.Integer,
sql_types.Numeric,
sql_types.DateTime,
sql_types.Date,
sql_types.Time,
sql_types.String,
sql_types._Binary,
])) > 0, '%s(%s), %s(%s)' %
(col.name, col.type, cols[i]['name'], ctype))
if not col.primary_key:
assert cols[i]['default'] is None
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
table_names = ['users', 'email_addresses']
if table_type == 'view':
table_names = ['users_v', 'email_addresses_v']
insp = inspect(meta.bind)
for table_name, table in zip(table_names, (users,
addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
# should be in order
for i, col in enumerate(table.columns):
eq_(col.name, cols[i]['name'])
ctype = cols[i]['type'].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
# Oracle returns Date for DateTime.
if testing.against('oracle') and ctype_def \
in (sql_types.Date, sql_types.DateTime):
ctype_def = sql_types.Date
# assert that the desired type and return type share
# a base within one of the generic types.
self.assert_(len(set(ctype.__mro__).
intersection(ctype_def.__mro__).
intersection([
sql_types.Integer,
sql_types.Numeric,
sql_types.DateTime,
sql_types.Date,
sql_types.Time,
sql_types.String,
sql_types._Binary,
])) > 0, '%s(%s), %s(%s)' %
(col.name, col.type, cols[i]['name'], ctype))
if not col.primary_key:
assert cols[i]['default'] is None
def _schema(self, model):
schema_fields = {'id': ID(stored=True, unique=True)}
searchable = set(model.__searchable__)
analyzer = getattr(model, '__whoosh_analyzer__') if hasattr(
model, '__whoosh_analyzer__') else self.analyzer
primary_keys = [key.name for key in inspect(model).primary_key]
for field in searchable:
if '.' in field:
fields = field.split('.')
field_attr = getattr(
getattr(model, fields[0]).property.mapper.class_,
fields[1])
else:
field_attr = getattr(model, field)
if hasattr(field_attr, 'descriptor') and isinstance(
field_attr.descriptor, hybrid_property):
field_type = Text
type_hint = getattr(field_attr, 'type_hint', None)
if type_hint is not None:
type_hint_map = {
'date': Date,
'datetime': DateTime,
'boolean': Boolean,
'integer': Integer,
'float': Float
}
field_type = type_hint if isclass(
type_hint) else type_hint_map.get(type_hint.lower(),
Text)
else:
field_type = field_attr.property.columns[0].type
if field in primary_keys:
schema_fields[field] = ID(stored=True, unique=True)
elif field_type in (DateTime, Date):
schema_fields[field] = DATETIME(stored=True, sortable=True)
elif field_type == Integer:
schema_fields[field] = NUMERIC(stored=True, numtype=int)
elif field_type == Float:
schema_fields[field] = NUMERIC(stored=True, numtype=float)
elif field_type == Boolean:
schema_fields[field] = BOOLEAN(stored=True)
else:
schema_fields[field] = TEXT(
stored=True, analyzer=analyzer, sortable=False)
return Schema(**schema_fields)