def test_delete_model(self):
project_state = self.set_up_test_model()
operation = migrations.DeleteModel("Pony")
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertTableExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertTableNotExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertTableExists('tests_pony')
python类schema_editor()的实例源码
def test_rename_model(self):
project_state = self.set_up_test_model()
operation = migrations.RenameModel("Pony", "Horse")
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertTableExists('tests_pony')
self.assertTableNotExists('tests_horse')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertTableExists('tests_horse')
self.assertTableNotExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertTableExists('tests_pony')
self.assertTableNotExists('tests_horse')
def test_remove_field(self):
project_state = self.set_up_test_model()
operation = migrations.RemoveField('Pony', 'pink')
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnExists('tests_pony', 'pink')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnNotExists('tests_pony', 'pink')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnExists('tests_pony', 'pink')
def test_remove_foreign_key_field(self):
project_state = self.set_up_test_model()
operation = migrations.RemoveField('Rider', 'pony')
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnExists('tests_rider', 'pony_id')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnNotExists('tests_rider', 'pony_id')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnExists('tests_rider', 'pony_id')
def test_alter_model_table(self):
project_state = self.set_up_test_model()
operation = migrations.AlterModelTable('Pony', 'tests_pony_2')
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertTableExists('tests_pony')
self.assertTableNotExists('tests_pony_2')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertTableExists('tests_pony_2')
self.assertTableNotExists('tests_pony')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertTableExists('tests_pony')
self.assertTableNotExists('tests_pony_2')
def test_alter_field(self):
project_state = self.set_up_test_model()
operation = migrations.AlterField('Pony', 'pink', models.IntegerField(null=True))
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnNotNull('tests_pony', 'pink')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnNull('tests_pony', 'pink')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnNotNull('tests_pony', 'pink')
def test_rename_field(self):
project_state = self.set_up_test_model()
operation = migrations.RenameField('Pony', 'pink', 'rosa')
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnExists('tests_pony', 'pink')
self.assertColumnNotExists('tests_pony', 'rosa')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnNotExists('tests_pony', 'pink')
self.assertColumnExists('tests_pony', 'rosa')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnExists('tests_pony', 'pink')
self.assertColumnNotExists('tests_pony', 'rosa')
def test_alter_index_together(self):
project_state = self.set_up_test_model()
operation = migrations.AlterIndexTogether('Pony', [('pink', 'weight')])
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertIndexNotExists('tests_pony', ['pink', 'weight'])
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertIndexExists('tests_pony', ['pink', 'weight'])
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertIndexNotExists('tests_pony', ['pink', 'weight'])
def test_alter_order_with_respect_to(self):
project_state = self.set_up_test_model()
operation = migrations.AlterOrderWithRespectTo('Rider', 'pony')
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertColumnNotExists('tests_rider', '_order')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
self.assertColumnExists('tests_rider', '_order')
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertColumnNotExists('tests_rider', '_order')
def test_constraint_name_method(self):
from ..models import AwareModel, NaiveModel, SelfReferentialModel
with connection.schema_editor() as editor:
self.assertEqual(3, len(editor._constraint_names(AwareModel, index=True)))
six.assertCountEqual(
self,
['tests_awaremodel_pkey'],
editor._constraint_names(AwareModel, primary_key=True)
)
six.assertCountEqual(self, [
'tests_awaremodel_pkey',
'tests_awaremodel_name_key'
], editor._constraint_names(AwareModel, unique=True))
six.assertCountEqual(self, [
'tests_awaremodel_name_key'
], editor._constraint_names(AwareModel, unique=True, primary_key=False))
six.assertCountEqual(self, ['tests_awaremodel_pkey'], editor._constraint_names(AwareModel, primary_key=True, unique=True))
six.assertCountEqual(self, [], editor._constraint_names(AwareModel, foreign_key=True))
six.assertCountEqual(self, [], editor._constraint_names(AwareModel, foreign_key=True, primary_key=True))
six.assertCountEqual(self, ['tests_awaremodel_factor_check'], editor._constraint_names(AwareModel, check=True))
with connection.schema_editor() as editor:
six.assertCountEqual(self, ['tests_naivemodel_pkey'], editor._constraint_names(NaiveModel, primary_key=True))
six.assertCountEqual(self, ['tests_naivemodel_name_key'], editor._constraint_names(NaiveModel, unique=True, primary_key=False))
with connection.schema_editor() as editor:
# These constraint names appear to change between different versions of django or python?
self.assertEqual(1, len(editor._constraint_names(SelfReferentialModel, foreign_key=True)))
def test_0002_patch_admin_reverse(self):
Schema.objects.mass_create('a', 'b', 'c')
remove_all_schemata = import_module('boardinghouse.migrations.0002_patch_admin').remove_all_schemata
remove_all_schemata(apps, connection.schema_editor())
self.assertEqual(0, Schema.objects.count())
def test_0004_change_sequence_owners(self):
Schema.objects.mass_create('a', 'b', 'c')
module = import_module('boardinghouse.migrations.0004_change_sequence_owners')
module.change_existing_sequence_owners(apps, connection.schema_editor())
# How can I assert that this was executed, and did what it says on the box?
module.noop(apps, connection.schema_editor())
test_clone_schema_db_function.py 文件源码
项目:django-boardinghouse
作者: schinckel
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_clone_schema_with_trigger(self):
TRIGGER_FUNCTION = '''CREATE OR REPLACE FUNCTION trigger_this() RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Trigger fired correctly';
END;
$$ LANGUAGE plpgsql'''
TRIGGER_TRIGGER = '''CREATE TRIGGER "test_trigger_this"
BEFORE INSERT ON tests_awaremodel
FOR EACH STATEMENT
EXECUTE PROCEDURE trigger_this()'''
project_state = ProjectState()
new_state = project_state.clone()
trigger_function_op = migrations.RunSQL(
sql=TRIGGER_FUNCTION,
reverse_sql='DROP FUNCTION trigger_this()'
)
trigger_op = migrations.RunSQL(
sql=TRIGGER_TRIGGER,
reverse_sql='DROP TRIGGER "test_trigger_this" BEFORE EACH UPDATE ON tests_awaremodel'
)
trigger_function_op.state_forwards('tests', new_state)
trigger_op.state_forwards('tests', new_state)
with connection.schema_editor() as editor:
trigger_function_op.database_forwards('tests', editor, project_state, new_state)
trigger_op.database_forwards('tests', editor, project_state, new_state)
Schema.objects.mass_create('a', 'b', 'c')
for schema in Schema.objects.all():
schema.activate()
with transaction.atomic():
with self.assertRaises(Exception) as exc:
self.assertTrue('Trigger fired correctly' == exc.args[0])
AwareModel.objects.create(name='FAILCREATE')
def execute_migration(schema_editor, operations, project=None):
"""Executes the specified migration operations
using the specified schema editor.
Arguments:
schema_editor:
The schema editor to use to
execute the migrations.
operations:
The migration operations to execute.
project:
The project state to use during the
migrations.
"""
project = project or migrations.state.ProjectState.from_apps(apps)
class Migration(migrations.Migration):
pass
Migration.operations = operations
executor = MigrationExecutor(schema_editor.connection)
executor.apply_migration(
project, Migration('eh', 'postgres_extra'))
def alter_db_table(field, filters: List[str]):
"""Creates a model with the specified field
and then renames the database table.
Arguments:
field:
The field to include into the
model.
filters:
List of strings to filter
SQL statements on.
"""
model = define_fake_model()
project = migrations.state.ProjectState.from_apps(apps)
with connection.schema_editor() as schema_editor:
execute_migration(schema_editor, [
migrations.CreateModel(
model.__name__,
fields=[
('title', field.clone())
]
)
], project)
with filtered_schema_editor(*filters) as (schema_editor, calls):
execute_migration(schema_editor, [
migrations.AlterModelTable(
model.__name__,
'NewTableName'
)
], project)
yield calls
def add_field(field, filters: List[str]):
"""Adds the specified field to a model.
Arguments:
field:
The field to add to a model.
filters:
List of strings to filter
SQL statements on.
"""
model = define_fake_model()
project = migrations.state.ProjectState.from_apps(apps)
with connection.schema_editor() as schema_editor:
execute_migration(schema_editor, [
migrations.CreateModel(
model.__name__,
fields=[]
)
], project)
with filtered_schema_editor(*filters) as (schema_editor, calls):
execute_migration(schema_editor, [
migrations.AddField(
model.__name__,
'title',
field
)
], project)
yield calls
def alter_field(old_field, new_field, filters: List[str]):
"""Alters a field from one state to the other.
Arguments:
old_field:
The field before altering it.
new_field:
The field after altering it.
filters:
List of strings to filter
SQL statements on.
"""
model = define_fake_model({'title': old_field})
project = migrations.state.ProjectState.from_apps(apps)
with connection.schema_editor() as schema_editor:
execute_migration(schema_editor, [
migrations.CreateModel(
model.__name__,
fields=[
('title', old_field.clone())
]
)
], project)
with filtered_schema_editor(*filters) as (schema_editor, calls):
execute_migration(schema_editor, [
migrations.AlterField(
model.__name__,
'title',
new_field
)
], project)
yield calls
def rename_field(field, filters: List[str]):
"""Renames a field from one name to the other.
Arguments:
field:
Field to be renamed.
filters:
List of strings to filter
SQL statements on.
"""
model = define_fake_model({'title': field})
project = migrations.state.ProjectState.from_apps(apps)
with connection.schema_editor() as schema_editor:
execute_migration(schema_editor, [
migrations.CreateModel(
model.__name__,
fields=[
('title', field.clone())
]
)
], project)
with filtered_schema_editor(*filters) as (schema_editor, calls):
execute_migration(schema_editor, [
migrations.RenameField(
model.__name__,
'title',
'newtitle'
)
], project)
yield calls
def test_run_sql(self):
project_state = self.set_up_test_model()
operation = migrations.RunSQL(
"""CREATE TABLE i_love_ponies (id int, special_thing int);
CREATE INDEX i_love_ponies_special_idx ON i_love_ponies (special_thing);
INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 42);
INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 51), (3, 60);
DELETE FROM i_love_ponies WHERE special_thing = 42;
UPDATE i_love_ponies SET special_thing = 42 WHERE id = 2;""",
"DROP TABLE i_love_ponies")
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
self.assertTableNotExists('i_love_ponies')
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
cursor = connection.cursor()
cursor.execute('SELECT * FROM a.i_love_ponies')
self.assertTableExists('i_love_ponies')
self.assertIndexExists('i_love_ponies', ['special_thing'])
@all_schemata
def objects_exist(cursor, **kwargs):
cursor = connection.cursor()
cursor.execute('SELECT * FROM i_love_ponies ORDER BY id')
result = cursor.fetchmany(4)
self.assertTrue(result, 'No objects found in {schema}'.format(**kwargs))
expected = [(2, 42), (3, 60)]
self.assertEqual(
sorted(expected),
sorted(result),
'Mismatch objects found in schema {schema}: expected {0}, saw {1}'
.format(expected, result, **kwargs)
)
with connection.cursor() as cursor:
objects_exist(cursor)
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
self.assertTableNotExists('i_love_ponies')
def test_run_python(self):
"""
Because this can run arbitrary python code, we can't know
which parts of it need to run against each schema, and which
parts run against the public schema.
We could hack into any generated SQL, and inspect it, looking
for table names, attempting to push data to the correct
schemata (including executing the SQL multiple times if
necessary).
Maybe we could fuck with the models generated by project_state.render(),
and make their generated SQL do what we need it to do. Although, it looks
like Pony.objects is a normal models.Manager class.
"""
project_state = self.set_up_test_model()
def forwards(models, schema_editor):
Pony = models.get_model('tests', 'Pony')
Pony.objects.create(pink=1, weight=3.55)
Pony.objects.create(weight=5)
def backwards(models, schema_editor):
Pony = models.get_model('tests', 'Pony')
Pony.objects.filter(pink=1, weight=3.55).delete()
Pony.objects.filter(weight=5).delete()
operation = migrations.RunPython(forwards, reverse_code=backwards)
new_state = project_state.clone()
operation.state_forwards('tests', new_state)
Pony = project_state.apps.get_model('tests', 'Pony')
@all_schemata
def pony_count(count, **kwargs):
found = Pony.objects.count()
self.assertEqual(
count,
found,
'Incorrect number of Ponies found in schema '
'{schema}: expected {0}, found {1}'.format(count, found, **kwargs)
)
pony_count(0)
with connection.schema_editor() as editor:
operation.database_forwards('tests', editor, project_state, new_state)
pony_count(2)
with connection.schema_editor() as editor:
operation.database_backwards('tests', editor, new_state, project_state)
pony_count(0)