def test_consumer_stop_during_commit(self):
# setup a client which will return a message block in response to fetch
# and just fail on the commit
mockclient = Mock()
mockclient.send_offset_commit_request.return_value = Deferred()
mockclient.send_fetch_request.return_value = Deferred()
the_group = 'U2'
the_topic = 'test_consumer_stop_during_commit'
the_part = 11
the_offset = 0
# Create a consumer and muck with the state a bit...
consumer = Consumer(mockclient, the_topic, the_part, Mock(), the_group,
auto_commit_every_ms=0)
mockback = Mock()
start_d = consumer.start(the_offset)
start_d.addCallback(mockback)
consumer._last_processed_offset = the_offset # Fake processed msgs
# Start a commit, don't fire the deferred, assert there's no result
commit_d = consumer.commit()
self.assertNoResult(commit_d)
self.assertEqual(consumer._commit_ds[0], commit_d)
# Stop the consumer, assert the start_d fired, and commit_d errbacks
consumer.stop()
mockback.assert_called_once_with('Stopped')
self.failureResultOf(commit_d, CancelledError)
评论列表
文章目录