def define_user_notification_table():
global user_notification_table
user_notification_table = Table('ckanext_requestdata_user_notification',
metadata,
Column('id', types.UnicodeText,
primary_key=True,
default=make_uuid),
Column('package_maintainer_id',
types.UnicodeText,
nullable=False),
Column('seen', types.Boolean,
default=False),
Index('ckanext_requestdata_user_'
'notification_id_idx', 'id'))
mapper(
ckanextUserNotification,
user_notification_table
)
python类UnicodeText()的实例源码
def define_maintainers_table():
global maintainers_table
maintainers_table = Table('ckanext_requestdata_maintainers', metadata,
Column('id', types.UnicodeText,
primary_key=True, default=make_uuid),
Column('request_data_id', types.UnicodeText,
ForeignKey('ckanext_requestdata_'
'requests.id')),
Column('maintainer_id', types.UnicodeText),
Column('email', types.UnicodeText),
Index('ckanext_requestdata_maintainers_id_idx',
'id'))
mapper(
ckanextMaintainers,
maintainers_table
)
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def setup_spatial_table(package_extent_class, db_srid=None):
if legacy_geoalchemy:
package_extent_table = Table(
'package_extent', meta.metadata,
Column('package_id', types.UnicodeText, primary_key=True),
GeometryExtensionColumn('the_geom', Geometry(2, srid=db_srid))
)
meta.mapper(
package_extent_class,
package_extent_table,
properties={'the_geom':
GeometryColumn(package_extent_table.c.the_geom,
comparator=PGComparator)}
)
GeometryDDL(package_extent_table)
else:
# PostGIS 1.5 requires management=True when defining the Geometry
# field
management = (postgis_version()[:1] == '1')
package_extent_table = Table(
'package_extent', meta.metadata,
Column('package_id', types.UnicodeText, primary_key=True),
Column('the_geom', Geometry('GEOMETRY', srid=db_srid,
management=management)),
)
meta.mapper(package_extent_class, package_extent_table)
return package_extent_table
def upgrade(migrate_engine):
metadata = MetaData()
metadata.bind = migrate_engine
group_table = Table('group', metadata, autoload=True)
group_extra_table = Table('group_extra', metadata,
Column('id', UnicodeText, primary_key=True, default=make_uuid),
Column('group_id', UnicodeText, ForeignKey('group.id')),
Column('key', UnicodeText),
Column('value', JsonType),
)
group_extra_table.create()
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def define_request_data_table():
global request_data_table
request_data_table = Table('ckanext_requestdata_requests', metadata,
Column('id', types.UnicodeText,
primary_key=True,
default=make_uuid),
Column('sender_name', types.UnicodeText,
nullable=False),
Column('sender_user_id', types.UnicodeText,
nullable=False),
Column('email_address', types.UnicodeText,
nullable=False),
Column('message_content', types.UnicodeText,
nullable=False),
Column('package_id', types.UnicodeText,
nullable=False),
Column('state', types.UnicodeText,
default=u'new'),
Column('data_shared', types.Boolean,
default=False),
Column('rejected', types.Boolean,
default=False),
Column('created_at', types.DateTime,
default=datetime.datetime.now),
Column('modified_at', types.DateTime,
default=datetime.datetime.now),
Index('ckanext_requestdata_requests_id_idx',
'id'))
mapper(
ckanextRequestdata,
request_data_table
)
def define_request_data_counters_table():
global request_data_counters_table
request_data_counters_table = Table('ckanext_requestdata_counters',
metadata,
Column('id', types.UnicodeText,
primary_key=True,
default=make_uuid),
Column('package_id',
types.UnicodeText),
Column('org_id', types.UnicodeText),
Column('requests', types.Integer,
default=0),
Column('replied', types.Integer,
default=0),
Column('declined', types.Integer,
default=0),
Column('shared', types.Integer,
default=0),
Index('ckanext_requestdata_counters_'
'id_idx', 'id'))
mapper(
ckanextRequestDataCounters,
request_data_counters_table
)
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process
def result_processor(self, dialect, coltype):
lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
if lob_processor is None:
return None
string_processor = sqltypes.UnicodeText.result_processor(
self, dialect, coltype)
if string_processor is None:
return lob_processor
else:
def process(value):
return string_processor(lob_processor(value))
return process