def define_user_notification_table():
global user_notification_table
user_notification_table = Table('ckanext_requestdata_user_notification',
metadata,
Column('id', types.UnicodeText,
primary_key=True,
default=make_uuid),
Column('package_maintainer_id',
types.UnicodeText,
nullable=False),
Column('seen', types.Boolean,
default=False),
Index('ckanext_requestdata_user_'
'notification_id_idx', 'id'))
mapper(
ckanextUserNotification,
user_notification_table
)
python类Boolean()的实例源码
def get_schema(model, analyzer):
schema = {}
primary = None
searchable = set(getattr(model, '__searchable__', []))
for field in model.__table__.columns:
# primary key id
if field.primary_key:
schema[field.name] = whoosh_fields.ID(
stored=True, unique=True, sortable=True)
primary = field.name
if field.name not in searchable:
continue
# text types
if isinstance(field.type, TEXT_TYPES):
schema[field.name] = whoosh_fields.TEXT(analyzer=analyzer)
elif isinstance(field.type, DATE_TYPES):
is_unique = getattr(field, 'unique', False)
schema[field.name] = whoosh_fields.DATETIME(unique=is_unique)
elif isinstance(field.type, sql_types.Boolean):
schema[field.name] = whoosh_fields.BOOLEAN()
else:
raise WhooshAlchemyError(
'cannot index column of type %s' % field.type)
return whoosh_fields.Schema(**schema), primary
def define_request_data_table():
global request_data_table
request_data_table = Table('ckanext_requestdata_requests', metadata,
Column('id', types.UnicodeText,
primary_key=True,
default=make_uuid),
Column('sender_name', types.UnicodeText,
nullable=False),
Column('sender_user_id', types.UnicodeText,
nullable=False),
Column('email_address', types.UnicodeText,
nullable=False),
Column('message_content', types.UnicodeText,
nullable=False),
Column('package_id', types.UnicodeText,
nullable=False),
Column('state', types.UnicodeText,
default=u'new'),
Column('data_shared', types.Boolean,
default=False),
Column('rejected', types.Boolean,
default=False),
Column('created_at', types.DateTime,
default=datetime.datetime.now),
Column('modified_at', types.DateTime,
default=datetime.datetime.now),
Index('ckanext_requestdata_requests_id_idx',
'id'))
mapper(
ckanextRequestdata,
request_data_table
)
def test_reflect_select(engine, table):
assert len(table.c) == 9
assert isinstance(table.c.integer, Column)
assert isinstance(table.c.integer.type, types.Integer)
assert isinstance(table.c.timestamp.type, types.TIMESTAMP)
assert isinstance(table.c.string.type, types.String)
assert isinstance(table.c.float.type, types.Float)
assert isinstance(table.c.boolean.type, types.Boolean)
assert isinstance(table.c.date.type, types.DATE)
assert isinstance(table.c.datetime.type, types.DATETIME)
assert isinstance(table.c.time.type, types.TIME)
assert isinstance(table.c.bytes.type, types.BINARY)
rows = table.select().execute().fetchall()
assert len(rows) == 1000
def psql_map_type(t):
if isinstance(t,String):
return {'type':'string'}
elif isinstance(t,Numeric) or isinstance(t,Integer):
return {'type':'number'}
elif isinstance(t,Boolean):
return {'type':'boolean'}
elif isinstance(t,Date):
return {'type':'date'}
elif isinstance(t,Time):
return {'type':'time'}
elif isinstance(t,DateTime):
return {'type':'datetime'}
# Python operator and function signature tables
def _schema(self, model):
schema_fields = {'id': ID(stored=True, unique=True)}
searchable = set(model.__searchable__)
analyzer = getattr(model, '__whoosh_analyzer__') if hasattr(
model, '__whoosh_analyzer__') else self.analyzer
primary_keys = [key.name for key in inspect(model).primary_key]
for field in searchable:
if '.' in field:
fields = field.split('.')
field_attr = getattr(
getattr(model, fields[0]).property.mapper.class_,
fields[1])
else:
field_attr = getattr(model, field)
if hasattr(field_attr, 'descriptor') and isinstance(
field_attr.descriptor, hybrid_property):
field_type = Text
type_hint = getattr(field_attr, 'type_hint', None)
if type_hint is not None:
type_hint_map = {
'date': Date,
'datetime': DateTime,
'boolean': Boolean,
'integer': Integer,
'float': Float
}
field_type = type_hint if isclass(
type_hint) else type_hint_map.get(type_hint.lower(),
Text)
else:
field_type = field_attr.property.columns[0].type
if field in primary_keys:
schema_fields[field] = ID(stored=True, unique=True)
elif field_type in (DateTime, Date):
schema_fields[field] = DATETIME(stored=True, sortable=True)
elif field_type == Integer:
schema_fields[field] = NUMERIC(stored=True, numtype=int)
elif field_type == Float:
schema_fields[field] = NUMERIC(stored=True, numtype=float)
elif field_type == Boolean:
schema_fields[field] = BOOLEAN(stored=True)
else:
schema_fields[field] = TEXT(
stored=True, analyzer=analyzer, sortable=False)
return Schema(**schema_fields)