test_k8s_client.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码
def test_watch(self, m_get):
        path = '/test'
        data = [{'obj': 'obj%s' % i} for i in range(3)]
        lines = [jsonutils.dumps(i) for i in data]

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.iter_lines.return_value = lines
        m_get.return_value = m_resp

        cycles = 3
        self.assertEqual(
            data * cycles,
            list(itertools.islice(self.client.watch(path),
                                  len(data) * cycles)))

        self.assertEqual(cycles, m_get.call_count)
        self.assertEqual(cycles, m_resp.close.call_count)
        m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
                                 params={'watch': 'true'}, cert=(None, None),
                                 verify=False)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号