def test_stdout_chunk(self, get_stdout, get_info_for_rj):
"""Test RunningJob() stdout_chunk"""
get_info_for_rj.return_value = dict([('stdout_size', 10)])
get_stdout.return_value = "line1\nline2\nline3\nline4\nline5\nline6"
running_job = pygenie.jobs.RunningJob('1234-stdout-chunk',
info={'status': 'SUCCEEDED'})
running_job.stdout_chunk(10, offset=0)
assert_equals(
[
call('1234-stdout-chunk', headers={'Range': 'bytes=0-10'})
],
get_stdout.call_args_list
)
评论列表
文章目录