def test_resync_http_error(self, m_sleep):
self.driver._init_received.set()
with patch.object(self.driver, "get_etcd_connection") as m_get:
with patch("calico.etcddriver.driver.monotonic_time") as m_time:
m_time.side_effect = iter([
1, 10, RuntimeError()
])
m_get.side_effect = HTTPError()
self.assertRaises(RuntimeError, self.driver._resync_and_merge)
评论列表
文章目录