test_ecs_operator.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:incubator-airflow-old 作者: apache 项目源码 文件源码
def test_execute_without_failures(self, check_mock, wait_mock):

        client_mock = self.aws_hook_mock.return_value.get_client_type.return_value
        client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES

        self.ecs.execute(None)

        self.aws_hook_mock.return_value.get_client_type.assert_called_once_with('ecs', region_name='eu-west-1')
        client_mock.run_task.assert_called_once_with(
            cluster='c',
            overrides={},
            startedBy=mock.ANY,  # Can by 'airflow' or 'Airflow'
            taskDefinition='t'
        )

        wait_mock.assert_called_once_with()
        check_mock.assert_called_once_with()
        self.assertEqual(self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号