test_signals.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:django-user-tasks 作者: edx 项目源码 文件源码
def _create_chain(self, eager):
        """Create a celery chain and verify some assertions about the corresponding status records"""
        chain(sample_task.si(self.user.id, '1'),
              sample_task.si(self.user.id, '2', user_task_name='Chain: 1, 2, 3'),
              sample_task.si(self.user.id, '3'),
              normal_task.si('Argument')).delay()
        assert UserTaskStatus.objects.count() == 4
        chain_status = UserTaskStatus.objects.get(task_class='celery.chain')
        assert chain_status.task_id
        assert chain_status.parent is None
        assert chain_status.is_container
        assert chain_status.name == 'Chain: 1, 2, 3'
        assert chain_status.total_steps == 3
        verify_state(chain_status, eager)

        children = UserTaskStatus.objects.filter(parent=chain_status)
        assert len(children) == 3
        for status in children:
            assert not status.is_container
            assert status.name in ['SampleTask: 1', 'SampleTask: 2', 'SampleTask: 3']
            assert status.total_steps == 1
            verify_state(status, eager)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号