def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
python类TransactionManagementError()的实例源码
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def test__crashes_on_enter_if_hooks_exist(self):
hook = Deferred()
post_commit_hooks.add(hook)
with ExpectedException(TransactionManagementError):
with post_commit_hooks:
pass
# The hook has been cancelled, but CancelledError is suppressed in
# hooks, so we don't see it here.
self.assertThat(hook, IsFiredDeferred())
# The hook list is cleared so that the exception is raised only once.
self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_hooks_exist_before_entering_transaction(self):
post_commit(lambda failure: None)
decorated_function = orm.transactional(lambda: None)
self.assertRaises(TransactionManagementError, decorated_function)
# The hook list is cleared so that the exception is raised only once.
self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_not_already_within_transaction(self):
with ExpectedException(TransactionManagementError):
with savepoint():
pass
def test__explodes_when_no_transaction_is_active(self):
self.assertRaises(
TransactionManagementError,
validate_in_transaction, connection)
def __enter__(self):
if len(self.hooks) > 0:
# Capture a textual description of the hooks to help us understand
# why this is about to blow oodles of egg custard in our faces.
description = "\n".join(gen_description_of_hooks(self.hooks))
# Crash when there are orphaned post-commit hooks. These might
# only turn up in testing, where transactions are managed by the
# test framework instead of this decorator. We need to fail hard
# -- not just warn about it -- to ensure it gets fixed.
self.reset()
raise TransactionManagementError(
"Orphaned post-commit hooks found:\n" + description)
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def after_feature(context, feature):
context.test_case.tearDown()
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient
from chroma_core.models import Command
JobSchedulerClient.create_host_ssh = context.old_create_host_ssh
JobSchedulerClient.create_targets = context.old_create_targets
JobSchedulerClient.create_filesystem = context.old_create_filesystem
JobSchedulerClient.command_run_jobs = context.old_run_jobs
Command.set_state = context.old_set_state
# If one of the steps fails, an exception will be thrown and the
# transaction rolled back. In which case, this teardown will cause
# a TME which we don't care about.
from django.db.transaction import TransactionManagementError as TME
try:
context.test_case._post_teardown()
except TME:
pass
context.runner.teardown_databases(context.old_db_config)
context.runner.teardown_test_environment()
# As of Django 1.4, teardown_databases() no longer restores the
# original db name in the connection's settings dict. The reasoning
# is documented in https://code.djangoproject.com/ticket/10868, and
# their concerns are understandable. In our case, however, we're not
# going on to do anything here which might affect the production DB.
# Therefore, the following hack restores pre-1.4 behavior:
for connection, old_name, destroy in context.old_db_config[0]:
connection.settings_dict['NAME'] = old_name
from chroma_cli.api import ApiHandle
ApiHandle.ApiClient = context.old_api_client
from chroma_api.authentication import CsrfAuthentication
CsrfAuthentication.is_authenticated = context.old_is_authenticated
from django.contrib.contenttypes.models import ContentType
ContentType.objects.clear_cache()
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient
JobSchedulerClient.test_host_contact = context.old_test_host_contact
##--def before_scenario(context, scenario):
#-- # Set up the scenario test environment
#-- context.runner.setup_test_environment()
#-- # We must set up and tear down the entire database between
#-- # scenarios. We can't just use db transactions, as Django's
#-- # TestClient does, if we're doing full-stack tests with Mechanize,
#-- # because Django closes the db connection after finishing the HTTP
#-- # response.
#-- context.old_db_config = context.runner.setup_databases()
#--def after_scenario(context, scenario):
#-- # Tear down the scenario test environment.
#-- context.runner.teardown_databases(context.old_db_config)
#-- context.runner.teardown_test_environment()
#-- # Bob's your uncle.