def test_pond_blocks_state_change_completed(self, modify_mock):
now = timezone.now()
mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
end=now - timedelta(days=1))
molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=True, failed=False, request_num=self.requests[0].id,
tracking_num=self.ur.id, event=[])
molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id,
tracking_num=self.ur.id, event=[])
molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id,
tracking_num=self.ur.id, event=[])
pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]),
start=now - timedelta(minutes=30), end=now - timedelta(minutes=20))
pond_blocks = [pb._to_dict() for pb in pond_blocks]
responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')
response = self.client.get(reverse('api:isDirty'))
response_json = response.json()
self.assertTrue(response_json['isDirty'])
request_states = ['COMPLETED', 'WINDOW_EXPIRED', 'WINDOW_EXPIRED']
for i, req in enumerate(self.requests):
req.refresh_from_db()
self.assertEqual(req.state, request_states[i])
self.ur.refresh_from_db()
self.assertEqual(self.ur.state, 'COMPLETED')
评论列表
文章目录