test_signals.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:django-user-tasks 作者: edx 项目源码 文件源码
def _create_group(self, eager):
        """Create a celery group and verify some assertions about the corresponding status records"""
        result = group(sample_task.s(self.user.id, '1'),
                       sample_task.s(self.user.id, '2', user_task_name='Group: 1, 2')).delay()
        assert UserTaskStatus.objects.count() == 3
        group_status = UserTaskStatus.objects.get(task_class='celery.group')
        assert group_status.task_id == result.id
        assert group_status.parent is None
        assert group_status.is_container
        assert group_status.name == 'Group: 1, 2'
        assert group_status.total_steps == 2
        verify_state(group_status, eager)

        assert len(result.children) == 2
        for result in result.children:
            task_id = result.id
            status = UserTaskStatus.objects.get(task_id=task_id)
            assert status.parent_id == group_status.id
            assert not status.is_container
            assert status.name in ['SampleTask: 1', 'SampleTask: 2']
            assert status.total_steps == 1
            verify_state(status, eager)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号