def test_interface_initially_up(self):
combined_id = WloadEndpointId("host_id", "orchestrator_id",
"workload_id", "endpoint_id")
ip_type = futils.IPV4
local_ep = self.create_endpoint(combined_id, ip_type)
ips = ["1.2.3.4"]
iface = "tapabcdef"
data = {
'state': "active",
'endpoint': "endpoint_id",
'mac': stub_utils.get_mac(),
'name': iface,
'ipv4_nets': ips,
'profile_ids': ["prof1"]
}
# We can only get on_interface_update calls after the first
# on_endpoint_update, so trigger that.
with nested(
mock.patch('calico.felix.devices.set_routes'),
mock.patch('calico.felix.devices.configure_interface_ipv4'),
mock.patch('calico.felix.devices.interface_up'),
mock.patch('calico.felix.devices.interface_exists'),
) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
m_iface_up.return_value = True
m_iface_exists.return_value = True
local_ep.on_endpoint_update(data, async=True)
self.step_actor(local_ep)
self.assertEqual(local_ep._mac, data['mac'])
self.assertTrue(m_conf.called)
self.assertTrue(local_ep._device_in_sync)
# Interface is up so we should get the routes striaght away.
m_set_routes.assert_called_once_with(ip_type,
set(ips),
iface,
data['mac'],
reset_arp=True)
评论列表
文章目录