def execute_job(self, job, *args, **kwargs):
# We shut down all database connections and the engine to make sure
# that they are not shared with the child process and closed there
# while still being in use in the main process, see
#
# https://github.com/ckan/ckan/issues/3365
#
# Note that this rolls back any non-committed changes in the session.
# Both `Session` and `engine` automatically re-initialize themselve
# when they are used the next time.
log.debug(u'Disposing database engine before fork')
meta.Session.remove()
meta.engine.dispose()
# The original implementation performs the actual fork
queue = remove_queue_name_prefix(job.origin)
log.info(u'Worker {} starts job {} from queue "{}"'.format(
self.key, job.id, queue))
# HACK
# for plugin in plugins.PluginImplementations(plugins.IForkObserver):
# plugin.before_fork()
_dispose_engines()
result = super(Worker, self).execute_job(job, *args, **kwargs)
log.info(u'Worker {} has finished job {} from queue "{}"'.format(
self.key, job.id, queue))
return result
评论列表
文章目录