def test_get_id_activity_task_event_multi_page(self, page1_response, page2_response,
dt1, dt2, dt3, mocker):
activity_task_failed_event = {'eventId':3,
'eventType':'ActivityTaskFailed',
'eventTimestamp':dt3,
'activityTaskFailedEventAttributes':{'scheduledEventId':1}}
decision_task_completed = {'eventId':2,
'eventType':'DecisionTaskCompleted',
'eventTimestamp':dt2}
activity_task_scheduled_event = {'eventId':1,
'eventType':'ActivityTaskScheduled',
'eventTimestamp':dt1,
'activityTaskScheduledEventAttributes':{'activityId':'a_id'}}
page1_response['events'] = [activity_task_failed_event, decision_task_completed]
page2_response['events'] = [activity_task_scheduled_event]
swf_mock = SwfMock()
swf_mock.pages['page2'] = page2_response
mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)
h = floto.History(domain='d', task_list='tl', response=page1_response)
assert h.get_id_activity_task_event(activity_task_failed_event) == 'a_id'
评论列表
文章目录