def test_is_timer_task_completed_multiple_pages(self, page1_response, page2_response, dt1, dt2,
dt3, mocker):
page1_response['events'] = [{'eventId':3,
'eventType':'TimerFired',
'eventTimestamp':dt3,
'timerFiredEventAttributes':{'timerId':'t_id'}},
{'eventId':2,
'eventType':'DecisionTaskCompleted',
'eventTimestamp':dt2}]
page2_response['events'] = [{'eventId':1,
'eventType':'TimerStarted',
'eventTimestamp':dt1,
'timerStartedEventAttributes':{'timerId':'t_id'}}]
page2_response.pop('nextPageToken', None)
swf_mock = SwfMock()
swf_mock.pages['page2'] = page2_response
mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)
h = floto.History(domain='d', task_list='tl', response=page1_response)
assert h._has_next_event_page()
assert h.is_timer_task_completed('t_id')
assert not h._has_next_event_page()
评论列表
文章目录