def test_patch_task(self):
"""
When celery.Task is patched
we patch the __init__, apply, apply_async, and run methods
"""
# Assert base class methods are patched
self.assertIsInstance(celery.Task.__init__, wrapt.BoundFunctionWrapper)
self.assertIsInstance(celery.Task.apply, wrapt.BoundFunctionWrapper)
self.assertIsInstance(celery.Task.apply_async, wrapt.BoundFunctionWrapper)
self.assertIsInstance(celery.Task.run, wrapt.BoundFunctionWrapper)
# Create an instance of a Task
task = celery.Task()
# Assert instance methods are patched
self.assertIsInstance(task.__init__, wrapt.BoundFunctionWrapper)
self.assertIsInstance(task.apply, wrapt.BoundFunctionWrapper)
self.assertIsInstance(task.apply_async, wrapt.BoundFunctionWrapper)
self.assertIsInstance(task.run, wrapt.BoundFunctionWrapper)
评论列表
文章目录