def test_demo_schemata_get_migrated(self):
user = User.objects.create_user(**CREDENTIALS)
schema = DemoSchema.objects.create(user=user, from_template=self.template)
operation = migrations.CreateModel("Pony", [
('pony_id', models.AutoField(primary_key=True)),
('pink', models.IntegerField(default=1)),
])
project_state = ProjectState()
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
schema.activate()
self.assertFalse('tests_pony' in get_table_list())
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
schema.activate()
self.assertTrue('tests_pony' in get_table_list())
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
schema.activate()
self.assertFalse('tests_pony' in get_table_list())
python类schema_editor()的实例源码
def test_unique(self):
num_constraints_before = len(self.get_constraints(RoomBooking))
# Add the index
index_name = 'roombooking_test_idx'
index = PartialIndex(fields=['user', 'room'], name=index_name, unique=True, where='deleted_at IS NULL')
with self.schema_editor() as editor:
editor.add_index(RoomBooking, index)
constraints = self.get_constraints(RoomBooking)
self.assertEqual(len(constraints), num_constraints_before + 1)
self.assertEqual(constraints[index_name]['columns'], ['user_id', 'room_id'])
self.assertEqual(constraints[index_name]['primary_key'], False)
self.assertEqual(constraints[index_name]['check'], False)
self.assertEqual(constraints[index_name]['index'], True)
self.assertEqual(constraints[index_name]['unique'], True)
# Drop the index
with self.schema_editor() as editor:
editor.remove_index(RoomBooking, index)
constraints = self.get_constraints(RoomBooking)
self.assertEqual(len(constraints), num_constraints_before)
self.assertNotIn(index_name, constraints)
def test_not_unique(self):
num_constraints_before = len(self.get_constraints(Job))
# Add the index
index_name = 'job_test_idx'
index = PartialIndex(fields=['-group'], name=index_name, unique=False, where_postgresql='is_complete = false', where_sqlite='is_complete = 0')
with self.schema_editor() as editor:
editor.add_index(Job, index)
constraints = self.get_constraints(Job)
self.assertEqual(len(constraints), num_constraints_before + 1)
self.assertEqual(constraints[index_name]['columns'], ['group'])
self.assertEqual(constraints[index_name]['orders'], ['DESC'])
self.assertEqual(constraints[index_name]['primary_key'], False)
self.assertEqual(constraints[index_name]['check'], False)
self.assertEqual(constraints[index_name]['index'], True)
self.assertEqual(constraints[index_name]['unique'], False)
# Drop the index
with self.schema_editor() as editor:
editor.remove_index(Job, index)
constraints = self.get_constraints(Job)
self.assertEqual(len(constraints), num_constraints_before)
self.assertNotIn(index_name, constraints)
def ready(self):
"""
This supports two use cases for using tsvector_field:
1. Configure your Django project to use tsvecotr_field's DatabaseSchemaEditor
directly by creating your own DatabaseWrapper and referencing
tsvector_field.DatabaseSchemaEditor in the SchemaEditorClass attribute.
See: tsvector_field/schema.py for more info.
2. Just add `tsvector_field` to your project's INSTALLED_APPS setting and this
will use the `pre_migrate` mechanism. Note: `pre_migrate` is not fired for
./manage.py migrate --run-syncdb. So if you are building apps without migrations
you will have to use the more reliable approach in option #1.
"""
from django.db import connection
from . import DatabaseSchemaEditor
if not isinstance(connection.schema_editor(), DatabaseSchemaEditor):
# only register for pre_migrate if we're not already configured
# with the DatabaseSchemaEditor, see option #1 in doc above
from django.db.models.signals import pre_migrate
from .receivers import inject_trigger_operations
pre_migrate.connect(inject_trigger_operations)
def migrate(self, ops, state=None):
class Migration(migrations.Migration):
operations = ops
migration = Migration('name', 'tests')
inject_trigger_operations([(migration, False)])
with connection.schema_editor() as schema_editor:
return migration.apply(state or ProjectState.from_apps(self.apps), schema_editor)
def test_add_check_constraint(self):
project_state = self.set_up_test_model()
operation = migrations.AlterField(
model_name='pony',
name='pink',
field=models.PositiveIntegerField(default=3)
)
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertNoConstraint('tests_pony', ['pink'], 'check')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertConstraint('tests_pony', ['pink'], 'check')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertNoConstraint('tests_pony', ['pink'], 'check')
def test_add_unique_constraint(self):
project_state = self.set_up_test_model()
operation = migrations.AlterField(
model_name='pony',
name='pink',
field=models.IntegerField(unique=True, default=3)
)
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertNoConstraint('tests_pony', ['pink'], 'unique')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertConstraint('tests_pony', ['pink'], 'unique')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertNoConstraint('tests_pony', ['pink'], 'unique')
def test_custom_migration_operation(self):
project_state = self.set_up_test_model()
operation = AddField(
app_label='tests',
model_name='pony',
name='yellow',
field=models.BooleanField(default=True)
)
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnNotExists('tests_pony', 'yellow')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnExists('tests_pony', 'yellow')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnNotExists('tests_pony', 'yellow')
def test_migrations_apply_to_templates(self):
template = SchemaTemplate.objects.create(name='a')
operation = migrations.CreateModel("Pony", [
('pony_id', models.AutoField(primary_key=True)),
('pink', models.IntegerField(default=1)),
])
project_state = ProjectState()
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
template.activate()
self.assertFalse('tests_pony' in get_table_list())
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
template.activate()
self.assertTrue('tests_pony' in get_table_list())
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
template.activate()
self.assertFalse('tests_pony' in get_table_list())
def filtered_schema_editor(*filters: List[str]):
"""Gets a schema editor, but filters executed SQL
statements based on the specified text filters.
Arguments:
filters:
List of strings to filter SQL
statements on.
"""
with connection.schema_editor() as schema_editor:
wrapper_for = schema_editor.execute
with mock.patch.object(BaseDatabaseSchemaEditor, 'execute', wraps=wrapper_for) as execute:
filter_results = {}
yield schema_editor, filter_results
for filter_text in filters:
filter_results[filter_text] = [
call for call in execute.mock_calls
if filter_text in str(call)
]
def create_drop_model(field, filters: List[str]):
"""Creates and drops a model with the specified field.
Arguments:
field:
The field to include on the
model to create and drop.
filters:
List of strings to filter
SQL statements on.
"""
model = define_fake_model({'title': field})
with filtered_schema_editor(*filters) as (schema_editor, calls):
execute_migration(schema_editor, [
migrations.CreateModel(
model.__name__,
fields=[
('title', field.clone())
]
),
migrations.DeleteModel(
model.__name__,
)
])
yield calls
def migrate(self, *filters: List[str]):
"""
Executes the recorded migrations.
Arguments:
filters: List of strings to filter SQL statements on.
Returns:
The filtered calls of every migration
"""
calls_for_migrations = []
while len(self.migrations) > 0:
migration = self.migrations.pop()
with filtered_schema_editor(*filters) as (schema_editor, calls):
migration_executor = MigrationExecutor(schema_editor.connection)
migration_executor.apply_migration(
self.project_state, migration
)
calls_for_migrations.append(calls)
return calls_for_migrations
def get_fake_model(fields=None, model_base=PostgresModel, meta_options={}):
"""Creates a fake model to use during unit tests."""
model = define_fake_model(fields, model_base, meta_options)
class TestProject:
def clone(self, *_args, **_kwargs):
return self
@property
def apps(self):
return self
class TestMigration(migrations.Migration):
operations = [HStoreExtension()]
with connection.schema_editor() as schema_editor:
migration_executor = MigrationExecutor(schema_editor.connection)
migration_executor.apply_migration(
TestProject(), TestMigration('eh', 'postgres_extra'))
schema_editor.create_model(model)
return model
def get_fake_model(fields=None, model_base=PostgresModel, meta_options={}):
"""Creates a fake model to use during unit tests."""
model = define_fake_model(fields, model_base, meta_options)
class TestProject:
def clone(self, *_args, **_kwargs):
return self
@property
def apps(self):
return self
class TestMigration(migrations.Migration):
operations = [HStoreExtension()]
with connection.schema_editor() as schema_editor:
migration_executor = MigrationExecutor(schema_editor.connection)
migration_executor.apply_migration(
TestProject(), TestMigration('eh', 'postgres_extra'))
schema_editor.create_model(model)
return model
def get_fake_model(fields=None, model_base=LocalizedModel, meta_options={}):
"""Creates a fake model to use during unit tests."""
model = define_fake_model(fields, model_base, meta_options)
class TestProject:
def clone(self, *_args, **_kwargs):
return self
@property
def apps(self):
return self
class TestMigration(migrations.Migration):
operations = [HStoreExtension()]
with connection.schema_editor() as schema_editor:
migration_executor = MigrationExecutor(schema_editor.connection)
migration_executor.apply_migration(
TestProject(), TestMigration('eh', 'postgres_extra'))
schema_editor.create_model(model)
return model
def __enter__(self):
'''
Create the tables in our database
'''
with connection.schema_editor() as editor:
for Model in self.models:
editor.create_model(Model)
def __exit__(self, *args):
'''
Remove the tables from our database
'''
with connection.schema_editor() as editor:
for Model in reversed(self.models):
editor.delete_model(Model)
managers.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def add_constraints_and_indexes(self):
"""
Re-create constraints and indexes on the model and its fields.
"""
with connection.schema_editor() as schema_editor:
schema_editor.alter_unique_together(
self.model,
(),
self.model._meta.unique_together,
)
schema_editor.alter_index_together(
self.model,
(),
self.model._meta.index_together,
)
for field in self.model.objects.constrained_fields:
field_copy = field.__copy__()
field_copy.db_constraint = False
schema_editor.alter_field(
self.model, field_copy, field
)
for field in self.model.objects.indexed_fields:
field_copy = field.__copy__()
field_copy.db_index = False
schema_editor.alter_field(
self.model, field_copy, field
)
managers.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def drop_constraints_and_indexes(self):
"""
Temporarily drop constraints and indexes on the model and its fields.
"""
with connection.schema_editor() as schema_editor:
schema_editor.alter_unique_together(
self.model,
self.model._meta.unique_together,
(),
)
schema_editor.alter_index_together(
self.model,
self.model._meta.index_together,
(),
)
for field in self.model.objects.constrained_fields:
field_copy = field.__copy__()
field_copy.db_constraint = False
schema_editor.alter_field(
self.model, field, field_copy
)
for field in self.model.objects.indexed_fields:
field_copy = field.__copy__()
field_copy.db_index = False
schema_editor.alter_field(
self.model, field, field_copy
)
def schema_editor(self):
# collect_sql=True -> do not actually execute.
return connection.schema_editor(collect_sql=True)
def test_roombooking_createsql(self):
with self.schema_editor() as editor:
sql = RoomBooking._meta.indexes[0].create_sql(RoomBooking, editor)
self.assertRegex(sql, ROOMBOOKING_SQL)
def test_roombooking_create_model(self):
with self.schema_editor() as editor:
editor.create_model(RoomBooking)
self.assertContainsMatch(editor.collected_sql, ROOMBOOKING_SQL)
def test_job_createsql(self):
with self.schema_editor() as editor:
sql = Job._meta.indexes[0].create_sql(Job, editor)
self.assertRegex(sql, JOB_NONUNIQUE_SQL % self.false(editor))
def test_job_create_model(self):
with self.schema_editor() as editor:
editor.create_model(Job)
f = self.false(editor)
self.assertContainsMatch(editor.collected_sql, JOB_NONUNIQUE_SQL % f)
self.assertContainsMatch(editor.collected_sql, JOB_UNIQUE_SQL % f)
def trigger_editor(self):
with DatabaseSchemaEditor(connection) as schema_editor:
return DatabaseTriggerEditor(schema_editor)
def test_create_model_no_function(self):
class NoWeightedColumns(models.Model):
search = SearchVectorField()
with DatabaseSchemaEditor(connection) as schema_editor:
schema_editor.create_model(NoWeightedColumns)
self.assertEqual(len(schema_editor.deferred_sql), 1)
self.assertIn('CREATE INDEX', schema_editor.deferred_sql[0])
self.assertIn('GIN', schema_editor.deferred_sql[0])
def test_create_model(self):
class TextDocument(models.Model):
title = models.CharField(max_length=128)
search = SearchVectorField([
WeightedColumn('title', 'A'),
], 'english')
with DatabaseSchemaEditor(connection) as schema_editor:
schema_editor.create_model(TextDocument)
self.assertEqual(len(schema_editor.deferred_sql), 3)
self.assertIn('CREATE INDEX', schema_editor.deferred_sql[0])
self.assertIn('GIN', schema_editor.deferred_sql[0])
self.assertIn('CREATE FUNCTION', schema_editor.deferred_sql[1])
self.assertIn('CREATE TRIGGER', schema_editor.deferred_sql[2])
def apply_operations(self, app_label, project_state, operations):
migration = Migration('name', app_label)
migration.operations = operations
with connection.schema_editor() as editor:
return migration.apply(project_state, editor)
def unapply_operations(self, app_label, project_state, operations):
migration = Migration('name', app_label)
migration.operations = operations
with connection.schema_editor() as editor:
return migration.unapply(project_state, editor)
def test_create_model(self):
operation = migrations.CreateModel("Pony", [
('pony_id', models.AutoField(primary_key=True)),
('pink', models.IntegerField(default=1)),
])
project_state = ProjectState()
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertTableNotExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertTableExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertTableNotExists('tests_pony')