test_databricks_hook.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:incubator-airflow-old 作者: apache 项目源码 文件源码
def test_get_run_state(self, mock_requests):
        mock_requests.codes.ok = 200
        mock_requests.get.return_value.json.return_value = GET_RUN_RESPONSE
        status_code_mock = mock.PropertyMock(return_value=200)
        type(mock_requests.get.return_value).status_code = status_code_mock

        run_state = self.hook.get_run_state(RUN_ID)

        self.assertEquals(run_state, RunState(
            LIFE_CYCLE_STATE,
            RESULT_STATE,
            STATE_MESSAGE))
        mock_requests.get.assert_called_once_with(
            get_run_endpoint(HOST),
            json={'run_id': RUN_ID},
            auth=(LOGIN, PASSWORD),
            headers=USER_AGENT_HEADER,
            timeout=self.hook.timeout_seconds)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号