def test_ensure_bridge_brings_up_interface(self):
# We have to bypass the CONF.fake_network check so that netifaces
# is actually called.
self.flags(fake_network=False)
fake_mac = 'aa:bb:cc:00:11:22'
fake_ifaces = {
netifaces.AF_LINK: [{'addr': fake_mac}]
}
calls = {
'device_exists': [mock.call('bridge')],
'_execute': [
mock.call('brctl', 'addif', 'bridge', 'eth0',
run_as_root=True, check_exit_code=False),
mock.call('ip', 'link', 'set', 'bridge', 'address', fake_mac,
run_as_root=True),
mock.call('ip', 'link', 'set', 'eth0', 'up',
run_as_root=True, check_exit_code=False),
mock.call('ip', 'route', 'show', 'dev', 'eth0'),
mock.call('ip', 'addr', 'show', 'dev', 'eth0', 'scope',
'global'),
]
}
with test.nested(
mock.patch.object(linux_net, 'device_exists', return_value=True),
mock.patch.object(linux_net, '_execute', return_value=('', '')),
mock.patch.object(netifaces, 'ifaddresses')
) as (device_exists, _execute, ifaddresses):
ifaddresses.return_value = fake_ifaces
driver = linux_net.LinuxBridgeInterfaceDriver()
driver.ensure_bridge('bridge', 'eth0')
device_exists.assert_has_calls(calls['device_exists'])
_execute.assert_has_calls(calls['_execute'])
ifaddresses.assert_called_once_with('eth0')
python类AF_LINK的实例源码
test_linux_net.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def get_host_ip4(iface_prefix=None, exclude_prefix=None):
if iface_prefix is None:
iface_prefix = ['']
if type(iface_prefix) in types.StringTypes:
iface_prefix = [iface_prefix]
if exclude_prefix is not None:
if type(exclude_prefix) in types.StringTypes:
exclude_prefix = [exclude_prefix]
ips = []
for ifacename in netifaces.interfaces():
matched = False
for t in iface_prefix:
if ifacename.startswith(t):
matched = True
break
if exclude_prefix is not None:
for ex in exclude_prefix:
if ifacename.startswith(ex):
matched = False
break
if not matched:
continue
addrs = netifaces.ifaddresses(ifacename)
if netifaces.AF_INET in addrs and netifaces.AF_LINK in addrs:
for addr in addrs[netifaces.AF_INET]:
ip = addr['addr']
if not is_ip4_loopback(ip):
ips.append(ip)
return ips
def __init__(self, interface, network, default, elements, loggers, tunnels):
"""Function initialized the dipatcher
Args:
interface : name of the network interface to listen
network : networkx graph representation of the network
default : default template
elements : elements of the network
loggers : instances of the logger modules
tunnels : tunnel configuration
"""
self.interface = interface
self.mac = netifaces.ifaddresses(self.interface)[netifaces.AF_LINK][0]['addr']
self.network = network
try:
post('http://localhost:8080/network', json=dumps(json_graph.node_link_data(self.network)))
except:
logger.exception('Exception: Cannot connect to local server.')
self.default = default
self.devices, self.routes, self.externals = elements
self.hpfeeds, self.dblogger = loggers
self.tunnels = tunnels
self.packet_queue = dict()
self.entry_points = list()
self.unreach_list = list()
self.pcapy_object = pcapy.open_live(self.interface, 65535, 1, 10)
self.decoder = ImpactDecoder.EthDecoder()
self.ip_decoder = ImpactDecoder.IPDecoder()
self.ip_icmp_decoder = ImpactDecoder.IPDecoderForICMP()
self.mac_set = set([self.mac])
for d in self.devices:
if len(d.mac):
self.mac_set.add(d.mac)
for r in self.routes:
if r.entry:
self.entry_points.append(r)
self.unreach_list.extend(r.unreach_list)
logger.info('Started dispatcher listening on interface %s', self.interface)
while True:
try:
(hdr, pkt) = self.pcapy_object.next()
self.callback(hdr, pkt)
except KeyboardInterrupt:
return
def getNetworkInfo(self):
"""Get network information as a dict
Returned dict example::
{
"eth0": {
"ip": {
"address": "192.168.0.25",
},
"ipv6": [{
"address": "2001:db8:3c4d::1a2f:1a2b",
}],
"mac": "aa:bb:cc:dd:ee:ff",
},
"wlan0": {
"ipv6": [{
"address": "2001:db8:3c4d::1a2f:1a2b",
}],
"mac": "aa:bb:cc:dd:ee:ff"
}
}
"""
network_info = {}
try:
for interface in netifaces.interfaces():
addresses = netifaces.ifaddresses(interface)
interface_info = {}
try:
addr = addresses[netifaces.AF_INET][0]['addr']
interface_info['ip'] = {}
interface_info['ip']['address'] = addr
except:
pass
try:
interface_info['ipv6'] = [{'address': addr['addr'].split('%')[0]} for addr in addresses[netifaces.AF_INET6]]
except:
pass
try:
interface_info['mac'] = addresses[netifaces.AF_LINK][0]['addr']
except:
pass
if interface_info:
network_info[interface] = interface_info
except:
exception('Error getting network info')
info = {}
info['list'] = network_info
return info