tasks_test.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:odl-video-service 作者: mitodl 项目源码 文件源码
def test_video_task_chain(mocker):
    """
    Test that video task get_task_id method returns the correct id from the chain.
    """
    def ctx():
        """ Return a mock context object """
        return celery.app.task.Context({
            'lang': 'py',
            'task': 'cloudsync.tasks.stream_to_s3',
            'id': '1853b857-84d8-4af4-8b19-1c307c1e07d5',
            'chain': [{
                'task': 'cloudsync.tasks.transcode_from_s3',
                'args': [351],
                'kwargs': {},
                'options': {
                    'task_id': '1a859e5a-8f71-4e01-9349-5ebc6dc66631'
                }
            }]
        })
    mocker.patch('cloudsync.tasks.VideoTask.request', new_callable=PropertyMock, return_value=ctx())
    task = VideoTask()
    assert task.get_task_id() == task.request.chain[0]['options']['task_id']
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号