def test_allocate_v6_with_mac(self):
port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
subnet6 = dict(id=1, first_ip=0, last_ip=0,
cidr="feed::/104", ip_version=6,
next_auto_assign_ip=0,
ip_policy=None)
subnet6 = models.Subnet(**subnet6)
mac = models.MacAddress()
mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")
old_override = cfg.CONF.QUARK.v6_allocation_attempts
cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK')
ip_address = netaddr.IPAddress("fe80::")
with self._stubs(policies=[], ip_address=ip_address) as (
policy_find, ip_find, ip_create, ip_update):
a = self.ipam._allocate_from_v6_subnet(self.context, 0, subnet6,
port_id, self.reuse_after,
mac_address=mac)
self.assertEqual(ip_address.value, a["address"])
# NCP-1548 - leaving this test to show the change from sometimes
# creating the IP address to always creating the IP address.
self.assertEqual(0, ip_update.call_count)
self.assertEqual(1, ip_create.call_count)
cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
python类EUI的实例源码
def test_rfc2462_generates_valid_ip(self):
mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
cidr = "fe80::/120"
ip = quark.ipam.rfc2462_ip(mac, cidr)
self.assertEqual(ip,
netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
def test_v6_generator(self):
mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
cidr = "fe80::/120"
port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
cidr = "fe80::/120"
gen = quark.ipam.generate_v6(mac, port_id, cidr)
ip = gen.next()
self.assertEqual(ip,
netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
ip = gen.next()
self.assertEqual(ip,
netaddr.IPAddress('fe80::40c9:a95:d83a:2ffa').value)
def test_apply_rules_set_fails_gracefully(self):
port_id = 1
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
conn_err = redis.ConnectionError
with mock.patch("quark.cache.security_groups_client."
"redis_base.ClientBase") as redis_mock:
mocked_redis_cli = mock.MagicMock()
redis_mock.return_value = mocked_redis_cli
client = sg_client.SecurityGroupsClient()
mocked_redis_cli.master.hset.side_effect = conn_err
with self.assertRaises(q_exc.RedisConnectionFailure):
client.apply_rules(port_id, mac_address.value, [])
def rfc2462_ip(mac, cidr):
# NOTE(mdietz): see RFC2462
int_val = netaddr.IPNetwork(cidr).value
mac = netaddr.EUI(mac)
LOG.info("Using RFC2462 method to generate a v6 with MAC %s" % mac)
int_val += mac.eui64().value
int_val ^= MAGIC_INT
return int_val
def deallocate_mac_address(self, context, address, **kwargs):
admin_context = context.elevated()
mac = db_api.mac_address_find(admin_context, address=address,
scope=db_api.ONE)
if not mac:
raise q_exc.MacAddressNotFound(
mac_address_id=address,
readable_mac=netaddr.EUI(address))
if (mac["mac_address_range"] is None or
mac["mac_address_range"]["do_not_use"]):
db_api.mac_address_delete(admin_context, mac)
else:
db_api.mac_address_update(admin_context, mac, deallocated=True,
deallocated_at=timeutils.utcnow())
def vif_key(self, device_id, mac_address):
mac = str(netaddr.EUI(mac_address))
# Lower cases and strips hyphens from the mac
mac = mac.translate(MAC_TRANS_TABLE, ":-")
return "{0}.{1}".format(device_id, mac)
def macGrab(self, packet):
"""Defines the OUI for a given MAC
This function serves as an example, it is not ready for implementation
"""
try:
parsed_mac = netaddr.EUI(packet.addr2)
print parsed_mac.oui.registration().org
except netaddr.core.NotRegisteredError, e:
fields.append('UNKNOWN')
### Move to handler.py
def process_flow_template(self, bridge, flow_template):
"""Method adds flows into the vswitch based on given flow template
and configuration of multistream feature.
"""
if ('pre_installed_flows' in self._traffic and
self._traffic['pre_installed_flows'].lower() == 'yes' and
'multistream' in self._traffic and self._traffic['multistream'] > 0 and
'stream_type' in self._traffic):
# multistream feature is enabled and flows should be inserted into OVS
# so generate flows based on template and multistream configuration
if self._traffic['stream_type'] == 'L2':
# iterate through destimation MAC address
dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value
for i in range(self._traffic['multistream']):
tmp_mac = netaddr.EUI(dst_mac_value + i)
tmp_mac.dialect = netaddr.mac_unix_expanded
flow_template.update({'dl_dst':tmp_mac})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
elif self._traffic['stream_type'] == 'L3':
# iterate through destimation IP address
dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value
for i in range(self._traffic['multistream']):
tmp_ip = netaddr.IPAddress(dst_ip_value + i)
flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
elif self._traffic['stream_type'] == 'L4':
# read transport protocol from configuration and iterate through its destination port
proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP
for i in range(self._traffic['multistream']):
flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
else:
self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type'])
# insert cached flows into the OVS
self._vswitch.add_flow(bridge, [], cache='flush')
else:
self._vswitch.add_flow(bridge, flow_template)
def slaac(value, query = ''):
''' Get the SLAAC address within given network '''
try:
vtype = ipaddr(value, 'type')
if vtype == 'address':
v = ipaddr(value, 'cidr')
elif vtype == 'network':
v = ipaddr(value, 'subnet')
if v.version != 6:
return False
value = netaddr.IPNetwork(v)
except:
return False
if not query:
return False
try:
mac = hwaddr(query, alias = 'slaac')
eui = netaddr.EUI(mac)
except:
return False
return eui.ipv6(value.network)
# ---- HWaddr / MAC address filters ----
def hwaddr(value, query = '', alias = 'hwaddr'):
''' Check if string is a HW/MAC address and filter it '''
query_func_extra_args = {
'': ('value',),
}
query_func_map = {
'': _empty_hwaddr_query,
'bare': _bare_query,
'bool': _bool_hwaddr_query,
'cisco': _cisco_query,
'eui48': _win_query,
'linux': _linux_query,
'pgsql': _postgresql_query,
'postgresql': _postgresql_query,
'psql': _postgresql_query,
'unix': _unix_query,
'win': _win_query,
}
try:
v = netaddr.EUI(value)
except:
if query and query != 'bool':
raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value)
extras = []
for arg in query_func_extra_args.get(query, tuple()):
extras.append(locals()[arg])
try:
return query_func_map[query](v, *extras)
except KeyError:
raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query)
return False
def __init__(self, id=0, ssid=None, client_mac=None, location=None):
self.id = id
self.ssid = ssid
self.client_mac = client_mac
self.location = location
self.client_org = None
try:
self.client_org = EUI(self.client_mac).oui.registration().org # OUI - Organizational Unique Identifier
except: pass # OUI not registered exception
def __init__(self, id=0, ssid=None, bssid=None, date=None, location=None):
self.id = id
self.ssid = ssid
self.bssid = bssid
self.date = date
self.location = location
self.ap_org = None
try:
self.ap_org = EUI(self.bssid).oui.registration().org # OUI - Organizational Unique Identifier
except: pass
def __init__(self, id=0, ssid=None, client_mac=None, date=None, location=None):
self.id = id
self.ssid = ssid
self.client_mac = client_mac
self.date = date
self.location = location
self.client_org = None
try:
self.client_org = EUI(self.client_mac).oui.registration().org # OUI - Organizational Unique Identifier
except: pass
def get_vendor(mac):
if mac != "":
maco = EUI(mac) # EUI - Extended Unique Identifier
try:
return maco.oui.registration().org # OUI - Organizational Unique Identifier
except: # OUI not registered exception
return None
return None
def source(self):
"""
Source MAC address
"""
return str(netaddr.EUI(self._src_mac))
def source(self, value):
self._src_mac = netaddr.EUI(value).value
def destination(self):
"""
destination MAC address
"""
return str(netaddr.EUI(self._dst_mac))
def destination(self, value):
self._dst_mac = netaddr.EUI(value).value
def test_traffic_variable(self):
tx = self.layer.tx
rx = self.layer.rx
tx.streams[2].is_enabled = True
tx.streams[2].save()
utils.send_and_receive(tx, rx, duration=1, save_as='capture.pcap')
if utils.is_pypy():
return
capture = pyshark.FileCapture('capture.pcap')
for num_pkt, pkt in enumerate(capture):
self.assertEqual((num_pkt + 1) * 10, int(pkt.eth.len))
src = netaddr.EUI(pkt.eth.src).value
self.assertEqual(num_pkt * 0x100, src)
dst = netaddr.EUI(pkt.eth.dst).value
self.assertEqual(0xffffffffffff - num_pkt * 0x1000000, dst)