def on_action_save(self, sender, instance, created, raw, **kwargs):
if created and not raw:
transaction.on_commit(lambda: send_notifications(instance))
python类on_commit()的实例源码
def provision_run(self, spark_job, first_run=False):
"""
Actually run the given Spark job.
If this is the first run we'll update the "last_run_at" value
to the start date of the spark_job so Celery beat knows what's
going on.
"""
spark_job.run()
if first_run:
def update_last_run_at():
schedule_entry = spark_job.schedule.get()
if schedule_entry is None:
schedule_entry = spark_job.schedule.add()
schedule_entry.reschedule(last_run_at=spark_job.start_date)
transaction.on_commit(update_last_run_at)
def save(self, *args, **kwargs):
# whether the job is being created for the first time
first_save = self.pk is None
# resetting expired_date in case a user resets the end_date
if self.expired_date and self.end_date and self.end_date > timezone.now():
self.expired_date = None
super().save(*args, **kwargs)
# Remove the cached latest run to this objects will requery it.
try:
delattr(self, 'latest_run')
except AttributeError: # pragma: no cover
pass # It didn't have a `latest_run` and that's ok.
# first remove if it exists
self.schedule.delete()
# and then add it, but only if the end date is in the future
if self.has_future_end_date(timezone.now()):
self.schedule.add()
if first_save:
transaction.on_commit(self.first_run)
def _send_notify(self, message_type, updated_fields=None, data=None, always_send=True):
group_name = self._get_group_name(message_type=message_type)
message_data = {
'group': group_name,
'type': self._get_message_type(message_type),
'object': data or self._get_push_data(updated_fields=updated_fields, message_type=message_type),
}
if updated_fields and 'updated_fields' not in message_data['object']:
message_data['object']['updated_fields'] = updated_fields
if message_type == 'update' and updated_fields and always_send is False:
if not set(updated_fields) & set(message_data['object'].keys()):
return
payload = {
'text': json.dumps(build_message(
generate_id=True,
handler='subscription-update',
**message_data
), cls=self.encoder)
}
on_transaction_commit(
lambda: Group(group_name).send(payload)
)
def save(self, **kwargs):
new = self.pk is None
if not new and (self.was_processed or not self.processed):
raise TypeError
super().save(**kwargs)
with suppress(FileExistsError):
os.mkdir(os.path.dirname(self._changed_geometries_filename()))
from c3nav.mapdata.utils.cache.changes import changed_geometries
pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb'))
if new:
transaction.on_commit(
lambda: cache.set('mapdata:last_update', self.to_tuple, 300)
)
if settings.HAS_CELERY:
transaction.on_commit(
lambda: process_map_updates.delay()
)
def update_history(instance, **kwargs):
if instance.body != instance.db_body \
or instance.layout != instance.db_layout \
or instance.subject != instance.db_subject:
instance.version += 1
if instance.version != instance.db_version:
version = instance.version
@transaction.on_commit
def save():
instance.history.create(
version=version,
body=instance.body,
subject=instance.subject,
layout=instance.layout,
)
def model_post_save(sender, instance, created=False, **kwargs):
"""
Signal emitted after any model is saved via Django ORM.
:param sender: Model class that was saved
:param instance: The actual instance that was saved
:param created: True if a new row was created
"""
def notify():
table = sender._meta.db_table
if created:
observer_client.notify_table_insert(table)
else:
observer_client.notify_table_update(table)
transaction.on_commit(notify)
def model_m2m_changed(sender, instance, action, **kwargs):
"""
Signal emitted after any M2M relation changes via Django ORM.
:param sender: M2M intermediate model
:param instance: The actual instance that was saved
:param action: M2M action
"""
def notify():
table = sender._meta.db_table
if action == 'post_add':
observer_client.notify_table_insert(table)
elif action in ('post_remove', 'post_clear'):
observer_client.notify_table_remove(table)
transaction.on_commit(notify)
def profile_following_change(sender, instance, action, pk_set, **kwargs):
"""Deliver notification on new followers."""
logger.debug("profile_following_change - sender %s, instance %s, action %s, pk_set %s, kwargs: %s",
sender, instance, action, pk_set, kwargs)
if action in ["post_add", "post_remove"]:
logger.debug("profile_following_change - Got %s from %s for %s", action, sender, instance)
logger.debug("profile_following_change - pk_set %s", pk_set)
transaction.on_commit(lambda: on_commit_profile_following_change(action, pk_set, instance))
def profile_post_save(instance, **kwargs):
if instance.is_local:
transaction.on_commit(lambda: federate_profile(instance))
def content_post_save(instance, **kwargs):
fetch_preview(instance)
render_content(instance)
if kwargs.get("created"):
notify_listeners(instance)
if instance.content_type == ContentType.REPLY:
transaction.on_commit(lambda: django_rq.enqueue(send_reply_notifications, instance.id))
elif instance.content_type == ContentType.SHARE and instance.share_of.local:
transaction.on_commit(lambda: django_rq.enqueue(send_share_notification, instance.id))
transaction.on_commit(lambda: update_streams_with_content(instance))
if instance.local:
transaction.on_commit(lambda: federate_content(instance))
def on_commit(self, fun, *args, **kwargs):
if args or kwargs:
fun = partial(fun, *args, **kwargs)
if on_commit is not None:
try:
return on_commit(fun)
except TransactionManagementError:
pass # not in transaction management, execute now.
return fun()
def sync_user_profile(sender, instance, created, **kwargs): # pylint: disable=unused-argument
"""
Signal handler create/update a DiscussionUser every time a profile is created/updated
"""
if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False):
return
transaction.on_commit(lambda: tasks.sync_discussion_user.delay(instance.user_id))
def handle_create_programenrollment(sender, instance, created, **kwargs): # pylint: disable=unused-argument
"""
When a ProgramEnrollment model is created/updated, update index.
"""
transaction.on_commit(lambda: index_program_enrolled_users.delay([instance.id]))
def handle_delete_programenrollment(sender, instance, **kwargs): # pylint: disable=unused-argument
"""
When a ProgramEnrollment model is deleted, update index.
"""
enrollment_id = instance.id # this is modified in-place on delete, so store it on a local
transaction.on_commit(lambda: remove_program_enrolled_user.delay(enrollment_id))
def handle_create_coursecertificate(sender, instance, created, **kwargs): # pylint: disable=unused-argument
"""
When a MicromastersCourseCertificate model is created
"""
if created:
user = instance.final_grade.user
program = instance.final_grade.course_run.course.program
transaction.on_commit(lambda: generate_program_certificate(user, program))
def handle_update_profile(sender, instance, **kwargs):
"""Update index when Profile model is updated."""
transaction.on_commit(lambda: index_users.delay([instance.user.id], check_if_changed=True))
def handle_update_employment(sender, instance, **kwargs):
"""Update index when Employment model is updated."""
transaction.on_commit(lambda: index_users.delay([instance.profile.user.id], check_if_changed=True))
def handle_delete_education(sender, instance, **kwargs):
"""Update index when Education model instance is deleted."""
transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
def handle_delete_employment(sender, instance, **kwargs):
"""Update index when Employment model instance is deleted."""
transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
def handle_update_percolate(sender, instance, **kwargs):
"""When a new query is created or a query is updated, update Elasticsearch too"""
transaction.on_commit(lambda: index_percolate_queries.delay([instance.id]))
def handle_delete_percolate(sender, instance, **kwargs):
"""When a query is deleted, make sure we also delete it on Elasticsearch"""
transaction.on_commit(lambda: delete_percolate_query.delay(instance.id))
def handle_remove_role(sender, instance, **kwargs):
"""Update index when Role model instance is deleted."""
transaction.on_commit(lambda: index_users.delay([instance.user.id]))
def save(self, *args, **kwargs):
created = not self.pk
super(Key, self).save(*args, **kwargs)
if created:
from .tasks import update_or_create_key
transaction.on_commit(lambda: update_or_create_key.delay(self.uid))
def render_submission_form(submission_form_id=None):
logger = render_submission_form.get_logger()
# XXX: Look to wait for submission form to appear. The celery task is
# triggered on submit before the request transaction is committed, so we
# have to wait. We should start using transaction.on_commit() as soon as
# we've updated to Django 1.9.
for i in range(60):
with transaction.atomic():
try:
sf = SubmissionForm.unfiltered.get(id=submission_form_id)
except SubmissionForm.DoesNotExist:
pass
else:
sf.render_pdf_document()
break
time.sleep(1)
else:
logger.error("SubmissionForm(id=%d) didn't appear", submission_form_id)
return
def save(self, *args, **kwargs):
created = False
if not self.pk:
created = True
# If no API key exists, just produce one.
# Typically: at instance creation
with transaction.atomic():
if not self.secret:
self.regen_secret()
super().save(*args, **kwargs)
if created and not self.password:
transaction.on_commit(self.send_invitation_email)
def post(self, request, *args, **kwargs):
data = request.POST
from_name, from_email = parseaddr(data['from'])
msg = IncomingMessage.objects.create(
body_html=data.get('html', ''),
body_text=data.get('text', ''),
from_email=from_email,
from_name=from_name,
original_post_data=dict(data),
subject=data.get('subject', '<No subject>'),
to_email=json.loads(data['envelope'])['to'][0],
)
for name, info in json.loads(data.get('attachment-info', '{}')).items():
attachment = Attachment(
content_id=info.get('content-id', ''),
content_type=info.get('type', ''),
file=request.FILES[name],
msg=msg,
)
if attachment.content_type:
attachment.file.content_type = attachment.content_type
attachment.save()
transaction.on_commit(partial(process_incoming_message.delay, msg.id))
return HttpResponse()
def destination_post_save(instance, **kwargs):
if hasattr(transaction, 'on_commit'):
transaction.on_commit(clear_destination_cache)
else:
clear_destination_cache()
# noinspection PyUnusedLocal
def destination_post_delete(instance, **kwargs):
if hasattr(transaction, 'on_commit'):
transaction.on_commit(clear_destination_cache)
else:
clear_destination_cache()
# noinspection PyUnusedLocal
def router_post_save(instance, **kwargs):
if hasattr(transaction, 'on_commit'):
transaction.on_commit(clear_router_cache)
else:
clear_router_cache()
# noinspection PyUnusedLocal