test_endpoint.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:felix 作者: axbaretto 项目源码 文件源码
def test_interface_goes_down_removes_routes(self):
        combined_id = WloadEndpointId("host_id", "orchestrator_id",
                                      "workload_id", "endpoint_id")
        ip_type = futils.IPV4
        local_ep = self.create_endpoint(combined_id, ip_type)

        ips = ["1.2.3.4"]
        iface = "tapabcdef"
        data = {
            'state': "active",
            'endpoint': "endpoint_id",
            'mac': stub_utils.get_mac(),
            'name': iface,
            'ipv4_nets': ips,
            'profile_ids': ["prof1"]
        }

        # We can only get on_interface_update calls after the first
        # on_endpoint_update, so trigger that.
        with nested(
                mock.patch('calico.felix.devices.set_routes'),
                mock.patch('calico.felix.devices.configure_interface_ipv4'),
                mock.patch('calico.felix.devices.interface_up'),
                mock.patch('calico.felix.devices.interface_exists'),
        ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
                m_iface_up.return_value = True
                m_iface_exists.return_value = True
                local_ep.on_endpoint_update(data, async=True)
                self.step_actor(local_ep)
                self.assertTrue(m_conf.called)  # Check we did the "UP" path.

        # Now pretend send an interface update.
        with mock.patch('calico.felix.devices.set_routes') as m_set_routes:
            local_ep.on_interface_update(False, async=True)
            self.step_actor(local_ep)
            m_set_routes.assert_called_once_with(ip_type,
                                                 set(),
                                                 iface,
                                                 None)
            self.assertTrue(local_ep._device_in_sync)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号