def get_ipv6_addr_by_EUI64(cidr, mac):
"""Generate a IPv6 addr by EUI-64 with CIDR and MAC
:param str cidr: a IPv6 CIDR
:param str mac: a MAC address
:return: an IPv6 Address
:rtype: netaddr.IPAddress
"""
# Check if the prefix is IPv4 address
is_ipv4 = netaddr.valid_ipv4(cidr)
if is_ipv4:
msg = "Unable to generate IP address by EUI64 for IPv4 prefix"
raise TypeError(msg)
try:
eui64 = int(netaddr.EUI(mac).eui64())
prefix = netaddr.IPNetwork(cidr)
return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
except (ValueError, netaddr.AddrFormatError):
raise TypeError('Bad prefix or mac format for generating IPv6 '
'address by EUI-64: %(prefix)s, %(mac)s:'
% {'prefix': cidr, 'mac': mac})
except TypeError:
raise TypeError('Bad prefix type for generate IPv6 address by '
'EUI-64: %s' % cidr)
python类EUI的实例源码
def genmac(value, prefix='', length=12):
'''
deterministically generates a "random" MAC with a configurable prefix
'''
# from: http://serverfault.com/questions/40712/what-range-of-mac-addresses-can-i-safely-use-for-my-virtual-machines
if prefix == '' :
mac_prefix = "0ac04d" # random "cord"-esque
# deterministically generate a value
h = hashlib.new('sha1')
h.update(value)
# build/trim MAC
mac_string = (mac_prefix + h.hexdigest())[0:length]
return netaddr.EUI(mac_string)
def sendWoL(mac, broadcast=findBroadcast()):
"""Given string mac and broadcast: Turn on computer using WoL
This was taken from http://code.activestate.com/recipes/358449-wake-on-lan/. Thanks, Fadly!
"""
# Cleans and removes delimiters from MAC
try:
mac = format_mac(EUI(mac), mac_bare)
except AddrFormatError:
raise ValueError('Incorrect MAC address format')
# Pad the synchronization stream.
data = ''.join(['FFFFFFFFFFFF', mac * 20])
send_data = b''
# Split up the hex values and pack.
for i in range(0, len(data), 2):
send_data = b''.join([send_data, pack('B', int(data[i: i + 2], 16))])
# Broadcast it to the LAN.
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(send_data, (broadcast, 7))
def _to_mac_range(val):
cidr_parts = val.split("/")
prefix = cidr_parts[0]
# FIXME(anyone): replace is slow, but this doesn't really
# get called ever. Fix maybe?
prefix = prefix.replace(':', '')
prefix = prefix.replace('-', '')
prefix_length = len(prefix)
if prefix_length < 6 or prefix_length > 12:
raise q_exc.InvalidMacAddressRange(cidr=val)
diff = 12 - len(prefix)
if len(cidr_parts) > 1:
mask = int(cidr_parts[1])
else:
mask = 48 - diff * 4
mask_size = 1 << (48 - mask)
prefix = "%s%s" % (prefix, "0" * diff)
try:
cidr = "%s/%s" % (str(netaddr.EUI(prefix)).replace("-", ":"), mask)
except netaddr.AddrFormatError:
raise q_exc.InvalidMacAddressRange(cidr=val)
prefix_int = int(prefix, base=16)
return cidr, prefix_int, prefix_int + mask_size
def test_allocate_v6_with_mac_fails_policy_raises(self):
cidr = netaddr.IPNetwork("fe80::dead:beef/64")
allocation_pool = [{"start": cidr[-4], "end": cidr[-2]}]
subnet = dict(allocation_pools=allocation_pool,
cidr="fe80::dead:beef/64", ip_version=6,
next_auto_assign_ip=0, tenant_id="fake")
subnet = {"subnet": subnet}
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
ip_policy = {"exclude": ["fe80::dead:beef/64"]}
ip_policy = {"ip_policy": ip_policy}
mac = models.MacAddress()
mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")
old_override = cfg.CONF.QUARK.v6_allocation_attempts
cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK')
with self._stubs(network, subnet, ip_policy) as (net, sub, ipp):
with self.assertRaises(n_exc.IpAddressGenerationFailure):
self.ipam.allocate_ip_address(self.context, [], net["id"], 0,
0, subnets=[sub["id"]])
cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
def test_update_port_with_security_groups(self, redis_cli):
mock_client = mock.MagicMock()
redis_cli.return_value = mock_client
port_id = str(uuid.uuid4())
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
security_groups = [str(uuid.uuid4())]
payload = {}
mock_client.serialize_groups.return_value = payload
self.driver.update_port(
context=self.context, network_id="public_network", port_id=port_id,
device_id=device_id, mac_address=mac_address,
security_groups=security_groups)
mock_client.serialize_groups.assert_called_once_with(security_groups)
mock_client.apply_rules.assert_called_once_with(
device_id, mac_address, payload)
def test_delete_port_redis_is_dead(self, sg_cli):
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
mock_client = mock.MagicMock()
sg_cli.return_value = mock_client
mock_client.delete_vif.side_effect = Exception
try:
self.driver.delete_port(context=self.context, port_id=2,
mac_address=mac_address,
device_id=device_id)
mock_client.delete_vif.assert_called_once_with(
device_id, mac_address)
except Exception:
# This test fails without the exception handling in
# _delete_port_security_groups
self.fail("This shouldn't have raised")
def test_apply_rules(self, strict_redis, uuid4):
client = sg_client.SecurityGroupsClient()
device_id = "device"
uuid4.return_value = "uuid"
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
client.apply_rules(device_id, mac_address.value, [])
self.assertTrue(client._client.master.hset.called)
redis_key = client.vif_key(device_id, mac_address.value)
rule_dict = {"rules": []}
client._client.master.hset.assert_any_call(
redis_key, sg_client.SECURITY_GROUP_HASH_ATTR,
json.dumps(rule_dict))
client._client.master.hset.assert_any_call(
redis_key, sg_client.SECURITY_GROUP_ACK, False)
def __init__(self, id=0, name=None, mac_address=None, ip_address=None,
connected_ssid=None, inactivity_time=None,
rx_packets=None, tx_packets=None,
rx_bitrate=None, tx_bitrate=None,
signal=None):
self.id = id
self.name = name
self.mac_address = mac_address
self.ip_address = ip_address
self.connected_ssid = connected_ssid
self.inactivity_time = inactivity_time
self.rx_packets = rx_packets
self.tx_packets = tx_packets
self.tx_bitrate = tx_bitrate
self.rx_bitrate = rx_bitrate
self.signal = signal
self.vendor = None
try:
self.vendor = EUI(mac_address).oui.registration().org # OUI - Organizational Unique Identifier
except Exception:
pass
account_identifier.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def to_global(prefix, mac, project_id):
project_hash = netaddr.IPAddress(
int(hashlib.sha1(project_id).hexdigest()[:8], 16) << 32)
static_num = netaddr.IPAddress(0xff << 24)
try:
mac_suffix = netaddr.EUI(mac).words[3:]
int_addr = int(''.join(['%02x' % i for i in mac_suffix]), 16)
mac_addr = netaddr.IPAddress(int_addr)
maskIP = netaddr.IPNetwork(prefix).ip
return (project_hash ^ static_num ^ mac_addr | maskIP).format()
except netaddr.AddrFormatError:
raise TypeError(_('Bad mac for to_global_ipv6: %s') % mac)
except TypeError:
raise TypeError(_('Bad prefix for to_global_ipv6: %s') % prefix)
except NameError:
raise TypeError(_('Bad project_id for to_global_ipv6: %s') %
project_id)
def test_serialization_with_eui_in_different_dialect(self, model):
field = fields.MACAddressField()
model.mac = netaddr.EUI('78-F8-82-B2-E5-5A', dialect=netaddr.mac_eui48)
serialized = field.serialize('mac', model)
assert serialized == '78:f8:82:b2:e5:5a'
def test_deserialization(self):
field = fields.MACAddressField()
value = '78:f8:82:b2:e5:5a'
deserialized = field.deserialize(value)
assert deserialized == '78:f8:82:b2:e5:5a'
assert repr(deserialized) == 'EUI(\'78:f8:82:b2:e5:5a\')'
assert deserialized.dialect == netaddr.mac_unix_expanded
def _to_python(self, value):
eui = netaddr.EUI(value)
eui.dialect = self.default_dialect
return eui
def mac(raw):
"""
Converts a raw string to a standardised MAC Address EUI Format.
:param raw: the raw string containing the value of the MAC Address
:return: a string with the MAC Address in EUI format
Example:
.. code-block:: python
>>> mac('0123.4567.89ab')
u'01:23:45:67:89:AB'
Some vendors like Cisco return MAC addresses like a9:c5:2e:7b:6: which is not entirely valid
(with respect to EUI48 or EUI64 standards). Therefore we need to stuff with trailing zeros
Example
>>> mac('a9:c5:2e:7b:6:')
u'A9:C5:2E:7B:60:00'
If Cisco or other obscure vendors use their own standards, will throw an error and we can fix
later, however, still works with weird formats like:
>>> mac('123.4567.89ab')
u'01:23:45:67:89:AB'
>>> mac('23.4567.89ab')
u'00:23:45:67:89:AB'
"""
if raw.endswith(':'):
flat_raw = raw.replace(':', '')
raw = '{flat_raw}{zeros_stuffed}'.format(
flat_raw=flat_raw,
zeros_stuffed='0'*(12-len(flat_raw))
)
return py23_compat.text_type(EUI(raw, dialect=_MACFormat))
def get_ipv6_addr_by_EUI64(prefix, mac):
"""Calculate IPv6 address using EUI-64 specification.
This method calculates the IPv6 address using the EUI-64
addressing scheme as explained in rfc2373.
:param prefix: IPv6 prefix.
:param mac: IEEE 802 48-bit MAC address.
:returns: IPv6 address on success.
:raises ValueError, TypeError: For any invalid input.
.. versionadded:: 1.4
"""
# Check if the prefix is an IPv4 address
if is_valid_ipv4(prefix):
msg = _("Unable to generate IP address by EUI64 for IPv4 prefix")
raise ValueError(msg)
try:
eui64 = int(netaddr.EUI(mac).eui64())
prefix = netaddr.IPNetwork(prefix)
return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
except (ValueError, netaddr.AddrFormatError):
raise ValueError(_('Bad prefix or mac format for generating IPv6 '
'address by EUI-64: %(prefix)s, %(mac)s:')
% {'prefix': prefix, 'mac': mac})
except TypeError:
raise TypeError(_('Bad prefix type for generating IPv6 address by '
'EUI-64: %s') % prefix)
def slaac(value, query = ''):
''' Get the SLAAC address within given network '''
try:
vtype = ipaddr(value, 'type')
if vtype == 'address':
v = ipaddr(value, 'cidr')
elif vtype == 'network':
v = ipaddr(value, 'subnet')
if ipaddr(value, 'version') != 6:
return False
value = netaddr.IPNetwork(v)
except:
return False
if not query:
return False
try:
mac = hwaddr(query, alias = 'slaac')
eui = netaddr.EUI(mac)
except:
return False
return eui.ipv6(value.network)
# ---- HWaddr / MAC address filters ----
def hwaddr(value, query = '', alias = 'hwaddr'):
''' Check if string is a HW/MAC address and filter it '''
query_func_extra_args = {
'': ('value',),
}
query_func_map = {
'': _empty_hwaddr_query,
'bare': _bare_query,
'bool': _bool_hwaddr_query,
'int': _int_hwaddr_query,
'cisco': _cisco_query,
'eui48': _win_query,
'linux': _linux_query,
'pgsql': _postgresql_query,
'postgresql': _postgresql_query,
'psql': _postgresql_query,
'unix': _unix_query,
'win': _win_query,
}
try:
v = netaddr.EUI(value)
except:
if query and query != 'bool':
raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value)
extras = []
for arg in query_func_extra_args.get(query, tuple()):
extras.append(locals()[arg])
try:
return query_func_map[query](v, *extras)
except KeyError:
raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query)
return False
def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None):
"""Prepare ports, packets for table 9 related tests.
"""
# Configure ARP packet and stream
if arp_packet:
srcmac = arp_packet[0]['Ether']['src']
arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded))
srcip = arp_packet[1]['ARP']['psrc']
arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset)
else:
raise UIException("ARP packet not supplied")
# set admin status of host and switch to Up, wait till operational status is up
sw_port_id = int(sw_port.split()[1])
sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]]
sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]]
if not sw_instance:
raise UIException("Not found switch id and port pair connection in configuration")
env.tg[1].connect_port(tg_port)
sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up')
sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up")
# Prepare and send stream
arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port,
arp_sa_increment=(1, packet_num + offset),
arp_sip_increment=(1, packet_num + offset))
env.tg[1].start_streams([arp_stream])
time.sleep(ipgap * packet_num)
env.tg[1].stop_streams([arp_stream])
return sw_instances
def _port_dict(port, fields=None):
res = {"id": port.get("id"),
"name": port.get("name"),
"network_id": port["network_id"],
"tenant_id": port.get("tenant_id"),
"mac_address": port.get("mac_address"),
"admin_state_up": port.get("admin_state_up"),
"status": "ACTIVE",
"security_groups": [group.get("id", None) for group in
port.get("security_groups", None)],
"device_id": port.get("device_id"),
"device_owner": port.get("device_owner")}
if "mac_address" in res and res["mac_address"]:
mac = str(netaddr.EUI(res["mac_address"])).replace('-', ':')
res["mac_address"] = mac
# NOTE(mdietz): more pythonic key in dict check fails here. Leave as get
if port.get("bridge"):
res["bridge"] = port["bridge"]
# NOTE(ClifHouck): This causes another trip to the DB since tags are
# are not eager loaded. According to mdietz this be a small impact on
# performance, but if the tag system gets used more on ports, we may
# want to eager load the tags.
try:
t = PORT_TAG_REGISTRY.get_all(port)
res.update(t)
except Exception as e:
# NOTE(morgabra) We really don't want to break port-listing if
# this goes sideways here, so we pass.
msg = ("Unknown error loading tags for port %s: %s"
% (port["id"], e))
LOG.exception(msg)
return res
def delete_port(context, id):
"""Delete a port.
: param context: neutron api request context
: param id: UUID representing the port to delete.
"""
LOG.info("delete_port %s for tenant %s" % (id, context.tenant_id))
port = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port:
raise n_exc.PortNotFound(port_id=id)
if 'device_id' in port: # false is weird, but ignore that
LOG.info("delete_port %s for tenant %s has device %s" %
(id, context.tenant_id, port['device_id']))
backend_key = port["backend_key"]
mac_address = netaddr.EUI(port["mac_address"]).value
ipam_driver = _get_ipam_driver(port["network"], port=port)
ipam_driver.deallocate_mac_address(context, mac_address)
ipam_driver.deallocate_ips_by_port(
context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
net_driver = _get_net_driver(port["network"], port=port)
base_net_driver = _get_net_driver(port["network"])
net_driver.delete_port(context, backend_key, device_id=port["device_id"],
mac_address=port["mac_address"],
base_net_driver=base_net_driver)
with context.session.begin():
db_api.port_delete(context, port)
def test_to_mac_range_cidr_format(self):
cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC/24")
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_just_prefix(self):
cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC")
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_unix_cidr_format(self):
cidr, first, last = mac_address_ranges._to_mac_range("AA-BB-CC/24")
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_unix_cidr_format_normal_length(self):
cidr, first, last = mac_address_ranges._to_mac_range("aabbcc000000/29")
first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
self.assertEqual(cidr, "AA:BB:CC:00:00:00/29")
self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
self.assertEqual(last_mac, "aa:bb:cc:8:0:0")
def _build_expected_unicorn_request_body(self, floating_ip_address, ports,
actual_body=None):
if actual_body:
# Since the port order is non-deterministic, we need to ensure
# that the order is correct
actual_port_ids = [endpoint['port']['uuid'] for endpoint in
actual_body['floating_ip']['endpoints']]
reordered_ports = []
for port_id in actual_port_ids:
for port in ports:
if port['id'] == port_id:
reordered_ports.append(port)
ports = reordered_ports
endpoints = []
for port in ports:
fixed_ips = []
for fixed_ip in port['fixed_ips']:
fixed_ips.append({
'ip_address': fixed_ip['ip_address'],
'version': self.user_subnet['ip_version'],
'subnet_id': self.user_subnet['id'],
'cidr': self.user_subnet['cidr'],
'address_type': 'fixed'
})
port_mac = int(netaddr.EUI(port['mac_address'].replace(':', '-')))
endpoints.append({
'port': {
'uuid': port['id'],
'name': port['name'],
'network_uuid': port['network_id'],
'mac_address': port_mac,
'device_id': port['device_id'],
'device_owner': port['device_owner'],
'fixed_ip': fixed_ips
},
'private_ip': port['fixed_ips'][0]['ip_address']
})
body = {'public_ip': floating_ip_address,
'endpoints': endpoints}
return {'floating_ip': body}
def test_mac_address_ranges(self):
mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00")
mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": False,
"first_address": mr1_mac.value,
"last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value,
"next_auto_assign_mac": mr1_mac.value}
with self._fixtures([mr1]):
with self.context.session.begin():
ranges = db_api.mac_address_range_find_allocation_counts(
self.context)
self.assertTrue(ranges[0]["cidr"], mr1["cidr"])
def test_mac_address_ranges_do_not_use_returns_nothing(self):
mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00")
mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": True,
"first_address": mr1_mac.value,
"last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value,
"next_auto_assign_mac": mr1_mac.value}
with self._fixtures([mr1]):
with self.context.session.begin():
ranges = db_api.mac_address_range_find_allocation_counts(
self.context)
self.assertTrue(ranges is None)
def test_update_port_with_security_groups_removal(self, redis_cli):
mock_client = mock.MagicMock()
redis_cli.return_value = mock_client
port_id = str(uuid.uuid4())
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
security_groups = []
self.driver.update_port(
context=self.context, network_id="public_network", port_id=port_id,
device_id=device_id, mac_address=mac_address,
security_groups=security_groups)
self.assertEqual(mock_client.serialize_groups.call_count, 0)
mock_client.delete_vif_rules.assert_called_once_with(
device_id, mac_address)
def test_delete_port(self, sg_cli):
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
mock_client = mock.MagicMock()
sg_cli.return_value = mock_client
self.driver.delete_port(context=self.context, port_id=2,
mac_address=mac_address, device_id=device_id)
mock_client.delete_vif.assert_called_once_with(
device_id, mac_address)
def test_allocate_v6_with_mac_generates_exceeds_limit_raises(self):
subnet6 = dict(cidr="feed::/104",
first_ip=self.v6_fip.value,
id=1,
ip_version=6,
ip_policy=None,
last_ip=self.v6_lip.value,
next_auto_assign_ip=self.v6_fip.value)
address = models.IPAddress()
address["address"] = self.v46_val
address["version"] = 4
address["subnet"] = models.Subnet(cidr="::ffff:0:0/96")
address["allocated_at"] = timeutils.utcnow()
mac = models.MacAddress()
mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")
old_override = cfg.CONF.QUARK.v6_allocation_attempts
cfg.CONF.set_override('v6_allocation_attempts', 0, 'QUARK')
with self._stubs(subnets=[[(subnet6, 0)]],
addresses=[address, None, None]):
with self.assertRaises(n_exc.IpAddressGenerationFailure):
addr = []
self.ipam.allocate_ip_address(self.context, addr, 0, 0, 0,
mac_address=mac)
cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')