def __call__(self, form, field):
if isinstance(field.data, Model):
return
if isinstance(field.data, string_types) and UUID_REGEXP.search(field.data):
try:
obj = self.model.get(uuid=field.data)
except self.model.DoesNotExist:
raise ValidationError(self.message)
field.data = obj
return
try:
obj = self.model.get(id=field.data)
except self.model.DoesNotExist:
raise ValidationError(self.message)
field.data = obj
return
python类Model()的实例源码
def build_fake_model(self, name):
"""
Build a fake model with some defaults and the given table name.
We need this so we can perform operations that actually require a model class.
:param name: Name of database table.
:return: A new model class.
:rtype: peewee.Model
"""
class FakeModel(peewee.Model):
class Meta:
database = peewee.Proxy()
primary_key = False
indexes = []
constraints = []
db_table = name
return FakeModel
def _get_meta_db_class(db):
"""creating a declartive class model for db
"""
class _BlockedMeta(BaseModel):
def __new__(cls, name, bases, attrs):
_instance = super(_BlockedMeta, cls).__new__(cls, name, bases, attrs)
_instance.objects = AsyncManager(_instance, db)
return _instance
class _Base(Model, metaclass=_BlockedMeta):
def to_dict(self):
return self._data
class Meta:
database = db
return _Base
def models(self):
"""Return self.application models."""
Model_ = self.app.config['PEEWEE_MODELS_CLASS']
ignore = self.app.config['PEEWEE_MODELS_IGNORE']
models = []
if Model_ is not Model:
try:
mod = import_module(self.app.config['PEEWEE_MODELS_MODULE'])
for model in dir(mod):
models = getattr(mod, model)
if not isinstance(model, pw.Model):
continue
models.append(models)
except ImportError:
return models
elif isinstance(Model_, BaseSignalModel):
models = BaseSignalModel.models
return [m for m in models if m._meta.name not in ignore]
def test_add_fk_column(self):
class Person(pw.Model):
class Meta:
database = self.db
class Car(pw.Model):
class Meta:
database = self.db
self.evolve_and_check_noop()
peeweedbevolve.unregister(Car)
class Car(pw.Model):
owner = pw.ForeignKeyField(rel_model=Person, null=False)
class Meta:
database = self.db
self.evolve_and_check_noop()
person = Person.create()
car = Car.create(owner=person)
def test_drop_fk_column(self):
class Person(pw.Model):
class Meta:
database = self.db
class Car(pw.Model):
owner = pw.ForeignKeyField(rel_model=Person, null=False)
class Meta:
database = self.db
self.evolve_and_check_noop()
person = Person.create()
car = Car.create(owner=person)
peeweedbevolve.unregister(Car)
class Car(pw.Model):
class Meta:
database = self.db
self.evolve_and_check_noop()
def test_change_fk_column_to_int(self):
class Person(pw.Model):
class Meta:
database = self.db
class Car(pw.Model):
owner = pw.ForeignKeyField(rel_model=Person, null=False)
class Meta:
database = self.db
self.evolve_and_check_noop()
person = Person.create()
car = Car.create(owner=person)
peeweedbevolve.unregister(Car)
class Car(pw.Model):
owner_id = pw.IntegerField(null=False)
class Meta:
database = self.db
self.evolve_and_check_noop()
self.assertEqual(Car.select().first().owner_id, person.id)
Car.create(owner_id=-1) # this should not fail
def test_add_not_null_constraint_with_records_and_default_which_is_function(self):
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
SomeModel.create(some_field=None)
peeweedbevolve.clear()
def woot():
return 'woot'
class SomeModel(pw.Model):
some_field = pw.CharField(null=False, default=woot)
class Meta:
database = self.db
self.evolve_and_check_noop()
self.assertEqual(SomeModel.select().first().some_field, 'woot')
def test_reorder_multi_index(self):
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
indexes = (
(('id', 'some_field'), False),
)
self.evolve_and_check_noop()
self.assertEqual(sorted(peeweedbevolve.normalize_indexes(peeweedbevolve.get_indexes_by_table(self.db,'somemodel'))), [(u'somemodel', (u'id',), True), (u'somemodel', (u'id',u'some_field'), False)])
peeweedbevolve.clear()
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
indexes = (
(('some_field', 'id'), False),
)
self.evolve_and_check_noop()
self.assertEqual(sorted(peeweedbevolve.normalize_indexes(peeweedbevolve.get_indexes_by_table(self.db,'somemodel'))), [(u'somemodel', (u'id',), True), (u'somemodel', (u'some_field',u'id'), False)])
def test_change_integer_to_fake_fk_column(self):
class Person(pw.Model):
class Meta:
database = self.db
class Car(pw.Model):
owner_id = pw.IntegerField(null=False)
class Meta:
database = self.db
self.evolve_and_check_noop()
car = Car.create(owner_id=-1)
peeweedbevolve.unregister(Car)
class Car(pw.Model):
owner = pw.ForeignKeyField(rel_model=Person, null=False, fake=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
person = Person.create()
car = Car.create(owner=-2)
self.assertEqual(Car.select().count(), 2)
def test_drop_column_default(self):
class SomeModel(pw.Model):
some_field = pw.CharField(null=True, default='woot2')
class Meta:
database = self.db
self.evolve_and_check_noop()
model = SomeModel.create()
self.assertEqual(model.some_field, 'woot2')
peeweedbevolve.clear()
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
model = SomeModel.create()
self.assertEqual(model.some_field, None)
self.assertEqual(SomeModel.get(SomeModel.id==model.id).some_field, None)
def test_dont_drop_table(self):
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
SomeModel.create(some_field='woot')
peeweedbevolve.clear()
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
evolve = False
self.evolve_and_check_noop()
# doesn't fail because table is still there
SomeModel.create(some_field='woot2')
def test_dont_add_column(self):
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
class Meta:
database = self.db
self.evolve_and_check_noop()
peeweedbevolve.clear()
class SomeModel(pw.Model):
some_field = pw.CharField(null=True)
some_other_field = pw.CharField(null=False)
class Meta:
database = self.db
evolve = False
self.evolve_and_check_noop()
# should not fail because the not-null column wasn't added
SomeModel.create(some_field='woot')
def test_change_decimal_precision(self):
class SomeModel(pw.Model):
some_field = pw.DecimalField(max_digits=4, decimal_places=0)
class Meta:
database = self.db
self.evolve_and_check_noop()
model = SomeModel.create(some_field=1234)
self.assertEqual(model.some_field, 1234)
peeweedbevolve.clear()
class SomeModel(pw.Model):
some_field = pw.DecimalField(max_digits=8, decimal_places=2)
class Meta:
database = self.db
self.evolve_and_check_noop()
self.assertEqual(SomeModel.select().first().some_field, 1234)
model.some_field = 123456.78
model.save()
self.assertEqual(SomeModel.select().first().some_field, decimal.Decimal('123456.78'))
def coerce_single_instance(lookup_field, value):
"""
Convert from whatever value is given to a scalar value for lookup_field.
If value is a dict, then lookup_field.name is used to get the value from the dict. Example:
lookup_field.name = 'id'
value = {'id': 123, 'name': 'tim'}
returns = 123
If value is a model, then lookup_field.name is extracted from the model. Example:
lookup_field.name = 'id'
value = <User id=123 name='tim'>
returns = 123
Otherwise the value is returned as-is.
:param lookup_field: Peewee model field used for getting name from value.
:param value: Some kind of value (usually a dict, Model instance, or scalar).
"""
if isinstance(value, dict):
return value.get(lookup_field.name)
if isinstance(value, peewee.Model):
return getattr(value, lookup_field.name)
return value
def test_json_field(database, value, kwargs, dict_type):
"""Ensure that the JSONField can load and dump dicts."""
class JSONModel(peewee.Model):
data = JSONField(**kwargs)
JSONModel._meta.database = database.database
JSONModel.create_table(True)
instance = JSONModel(data=value)
instance.save()
# Should be the same dict upon save.
assert instance.data is value
# Should be the same dict when queried.
queried_instance = JSONModel.select().first()
assert isinstance(queried_instance.data, dict_type)
assert queried_instance.data == value
def __init__(self, formdata=None, obj=None, data=None, *args, **kwargs):
super(PeeweeForm, self).__init__(formdata=formdata, obj=obj, data=data, *args, **kwargs)
if obj is not None and not isinstance(obj, Model):
raise PeeweeFormException(u'obj must None or peewee model instance.')
self.obj = obj
self._validate = False
def validate(model, field):
"""Decorator to add validation methods to validation_fields dictionary.
Populate validation_fields dictionary. The key/value pair is made up of
the model (table) name and tuples field name and the method to run.
For example,
{'table_1': [('field_1', 'method_1'), ('field_2', 'method_2'), ...],
'table_A': [('field_a', 'method_a'), ('field_b', 'method_b'), ...],
}
Parameters
----------
field : str
model : str
"""
def _validate(validation_method):
try:
validate.validation_fields[model].append(
(field, validation_method.__name__))
except AttributeError:
validate.validation_fields = defaultdict(list)
validate.validation_fields[model].append(
(field, validation_method.__name__))
@wraps(validation_method)
def wrapped(self):
return validation_method(self)
return wrapped
return _validate
#####################
# Extend peewee Model
#####################
def addSong(self, fid, filename, pid=None,artist=None, name=None):
if pid == '':
pid = 'None'
# query = 'INSERT INTO `tracks` (`id`,`filename`,`playlistID`) VALUES ("{}","{}","{}");'.format(fid, filename, pid)
#
self.CURSOR.execute("""INSERT INTO tracks (id,filename,playlistID,artist,name) VALUES(%s,%s,%s,%s,%s)""", (fid,filename,pid,artist,name))
self.db_conn.commit()
result = self.CURSOR.fetchone()
print("Adding new track to tracks table...")
#print(query)
print(result)
#self.CURSOR.close()
return 1
# db = MySQLDatabase('jonhydb', user='john',passwd='megajonhy')
# class Book(peewee.Model):
# author = peewee.CharField()
# title = peewee.TextField()
# class Meta:
# database = db
# Book.create_table()
# book = Book(author="me", title='Peewee is cool')
# book.save()
# for book in Book.filter(author="me"):
# print book.title
def test_initialize():
tc = TableCreator('awesome')
assert issubclass(tc.model, peewee.Model)
def foreign_key(self, coltype, name, references, **kwargs):
"""
Add a foreign key to the model.
This has some special cases, which is why it's not handled like all the other column types.
:param name: Name of the foreign key.
:param references: Table name in the format of "table.column" or just
"table" (and id will be default column).
:param kwargs: Additional kwargs to pass to the column instance.
You can also provide "on_delete" and "on_update" to add constraints.
:return: None
"""
try:
rel_table, rel_column = references.split('.', 1)
except ValueError:
rel_table, rel_column = references, 'id'
# Create a dummy model that we can relate this field to.
# Add the foreign key as a local field on the dummy model.
# We only do this so that Peewee can generate the nice foreign key constraint for us.
class DummyRelated(peewee.Model):
class Meta:
primary_key = False
database = peewee.Proxy()
db_table = rel_table
rel_field_class = FIELD_TO_PEEWEE.get(coltype, peewee.IntegerField)
rel_field = rel_field_class()
rel_field.add_to_class(DummyRelated, rel_column)
field = peewee.ForeignKeyField(DummyRelated, db_column=name, to_field=rel_column, **kwargs)
field.add_to_class(self.model, name)
def startup_library_service():
wamp = autobahn_sync.AutobahnSync()
wamp.run()
db = pw.SqliteDatabase('books.db')
class Book(pw.Model):
title = pw.CharField()
author = pw.CharField()
class Meta:
database = db
try:
db.create_table(Book)
except pw.OperationalError:
pass
@wamp.register('com.library.get_book')
def get_book(id):
try:
b = Book.get(id=id)
except pw.DoesNotExist:
return {'_error': "Doesn't exist"}
return {'id': id, 'title': b.title, 'author': b.author}
@wamp.register('com.library.new_book')
def new_book(title, author):
book = Book(title=title, author=author)
book.save()
wamp.session.publish('com.library.book_created', book.id)
return {'id': book.id}
def get_model_class(self):
if self.database is None:
raise RuntimeError('Database must be initialized.')
class BaseModel(Model):
class Meta:
database = self.database
return BaseModel
def Model(self):
if self._app is None:
database = getattr(self, 'database', None)
if database is None:
self.database = Proxy()
if not hasattr(self, '_model_class'):
self._model_class = self.get_model_class()
return self._model_class
def _create_table(model, fail_silently=True):
"""Utility method for creating a database table and indexes that is a
work around for unexpected behavior by the peewee ORM module. Specifically,
peewee create_table doesn't create compound indexes as expected.
Args:
db: an active peewee database connection
model: subclass of peewee.Model
indexes: a tuple of indexes that would normally be specified in
the peewee.Mode's class Meta. Example:
indexes = (
(('chrom', 'start', 'end'), True), # True means unique index
)
fail_silently: if True, no error will be raised if the table already exists
"""
# create table as a compressed TokuDB table
db = model._meta.database
indexes = model._meta.indexes
raw_query = db.compiler().create_table(model, safe=fail_silently)
raw_query = list(raw_query)
raw_query[0] = raw_query[0] + " engine=TokuDB, compression='tokudb_zlib', charset=latin1"
db.execute_sql(*raw_query)
logging.debug(raw_query[0])
# create indexes
safe_str = "IF NOT EXISTS" if fail_silently else ""
model_name = model.__name__.lower()
for i, (columns, unique) in enumerate(indexes):
columns_str = ",".join(map(lambda c: "`"+c+"`", columns))
unique_str = "UNIQUE" if unique else ""
q = "ALTER TABLE `%(model_name)s` ADD %(unique_str)s KEY %(safe_str)s `index%(i)s`(%(columns_str)s);" % locals()
logging.debug(q)
db.execute_sql(q)
def get_model_class(self):
if self.database is None:
raise RuntimeError('Database must be initialized.')
class BaseModel(Model):
class Meta:
database = self.database
return BaseModel
def Model(self):
if self._app is None:
database = getattr(self, 'database', None)
if database is None:
self.database = Proxy()
if not hasattr(self, '_model_class'):
self._model_class = self.get_model_class()
return self._model_class
def run(self, result=None):
model_classes = [
m[1] for m in inspect.getmembers(sys.modules['models'], inspect.isclass)
if issubclass(m[1], Model) and m[1] != Model
]
with test_database(test_db, model_classes):
super(TestCaseDB, self).run(result)
def test_pd_field():
db = SqliteDatabase(":memory:")
class TestModel(Model):
pf = PartialDateField(null=True)
class Meta:
database = db
TestModel.create_table()
TestModel(pf=PartialDate()).save()
TestModel(pf=None).save()
res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()]
assert res[0] is None and res[1] is None
TestModel(pf=PartialDate(1997)).save()
TestModel(pf=PartialDate(1996, 4)).save()
TestModel(pf=PartialDate(1995, 5, 13)).save()
res = [r[0] for r in db.execute_sql("SELECT pf FROM testmodel").fetchall()]
assert '1995-05-13' in res
assert '1996-04-**' in res
assert '1997-**-**' in res
res = [r.pf for r in TestModel.select().order_by(TestModel.pf)]
assert res[0] is None
assert res[1] is None
assert res[2] == PartialDate(1995, 5, 13)
assert res[3] == PartialDate(1996, 4)
assert res[4] == PartialDate(1997)
def _get_model_handler(self, model):
if peewee and issubclass(model, peewee.Model):
return PeeweeHandler(model)
if django and issubclass(model, DjangoModel):
return DjangoHandler(model)