def test_wait_for_etcd_event_conn_failed(self, m_sleep):
self.watcher.next_etcd_index = 1
m_resp = Mock()
m_resp.modifiedIndex = 123
read_timeout = etcd.EtcdConnectionFailed()
read_timeout.cause = ReadTimeoutError(Mock(), "", "")
other_error = etcd.EtcdConnectionFailed()
other_error.cause = ExpectedException()
responses = [
read_timeout,
other_error,
m_resp,
]
self.m_client.read.side_effect = iter(responses)
event = self.watcher.wait_for_etcd_event()
self.assertEqual(event, m_resp)
self.assertEqual(m_sleep.mock_calls, [call(1)])
评论列表
文章目录