def test_exec(self, gcs_hook, dataflow_mock):
"""Test DataFlowHook is created and the right args are passed to
start_python_workflow.
"""
start_python_hook = dataflow_mock.return_value.start_python_dataflow
gcs_download_hook = gcs_hook.return_value.google_cloud_to_local
self.dataflow.execute(None)
self.assertTrue(dataflow_mock.called)
expected_options = {
'project': 'test',
'staging_location': 'gs://test/staging',
'output': 'gs://test/output'
}
gcs_download_hook.assert_called_once_with(PY_FILE)
start_python_hook.assert_called_once_with(TASK_ID, expected_options,
mock.ANY, PY_OPTIONS)
self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
test_dataflow_operator.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录