python类EUI的实例源码

test_ipam.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_allocate_v6_with_mac(self):
        port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
        subnet6 = dict(id=1, first_ip=0, last_ip=0,
                       cidr="feed::/104", ip_version=6,
                       next_auto_assign_ip=0,
                       ip_policy=None)
        subnet6 = models.Subnet(**subnet6)

        mac = models.MacAddress()
        mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")

        old_override = cfg.CONF.QUARK.v6_allocation_attempts
        cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK')
        ip_address = netaddr.IPAddress("fe80::")

        with self._stubs(policies=[], ip_address=ip_address) as (
                policy_find, ip_find, ip_create, ip_update):
            a = self.ipam._allocate_from_v6_subnet(self.context, 0, subnet6,
                                                   port_id, self.reuse_after,
                                                   mac_address=mac)
            self.assertEqual(ip_address.value, a["address"])

            # NCP-1548 - leaving this test to show the change from sometimes
            # creating the IP address to always creating the IP address.
            self.assertEqual(0, ip_update.call_count)
            self.assertEqual(1, ip_create.call_count)

        cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
test_ipam.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_rfc2462_generates_valid_ip(self):
        mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        cidr = "fe80::/120"
        ip = quark.ipam.rfc2462_ip(mac, cidr)
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
test_ipam.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_v6_generator(self):
        mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        cidr = "fe80::/120"
        port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
        cidr = "fe80::/120"
        gen = quark.ipam.generate_v6(mac, port_id, cidr)
        ip = gen.next()
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
        ip = gen.next()
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::40c9:a95:d83a:2ffa').value)
test_security_groups_client.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_apply_rules_set_fails_gracefully(self):
        port_id = 1
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        conn_err = redis.ConnectionError
        with mock.patch("quark.cache.security_groups_client."
                        "redis_base.ClientBase") as redis_mock:
            mocked_redis_cli = mock.MagicMock()
            redis_mock.return_value = mocked_redis_cli

            client = sg_client.SecurityGroupsClient()
            mocked_redis_cli.master.hset.side_effect = conn_err
            with self.assertRaises(q_exc.RedisConnectionFailure):
                client.apply_rules(port_id, mac_address.value, [])
ipam.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def rfc2462_ip(mac, cidr):
    # NOTE(mdietz): see RFC2462
    int_val = netaddr.IPNetwork(cidr).value
    mac = netaddr.EUI(mac)
    LOG.info("Using RFC2462 method to generate a v6 with MAC %s" % mac)
    int_val += mac.eui64().value
    int_val ^= MAGIC_INT
    return int_val
ipam.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def deallocate_mac_address(self, context, address, **kwargs):
        admin_context = context.elevated()
        mac = db_api.mac_address_find(admin_context, address=address,
                                      scope=db_api.ONE)
        if not mac:
            raise q_exc.MacAddressNotFound(
                mac_address_id=address,
                readable_mac=netaddr.EUI(address))

        if (mac["mac_address_range"] is None or
                mac["mac_address_range"]["do_not_use"]):
            db_api.mac_address_delete(admin_context, mac)
        else:
            db_api.mac_address_update(admin_context, mac, deallocated=True,
                                      deallocated_at=timeutils.utcnow())
redis_base.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def vif_key(self, device_id, mac_address):
        mac = str(netaddr.EUI(mac_address))

        # Lower cases and strips hyphens from the mac
        mac = mac.translate(MAC_TRANS_TABLE, ":-")
        return "{0}.{1}".format(device_id, mac)
snarf.py 文件源码 项目:kSnarf 作者: ICSec 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def macGrab(self, packet):
        """Defines the OUI for a given MAC
        This function serves as an example, it is not ready for implementation
        """
        try:
            parsed_mac = netaddr.EUI(packet.addr2)
            print parsed_mac.oui.registration().org
        except netaddr.core.NotRegisteredError, e:
            fields.append('UNKNOWN')


    ### Move to handler.py
vswitch_controller_p2p.py 文件源码 项目:vswitchperf 作者: opnfv 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_flow_template(self, bridge, flow_template):
        """Method adds flows into the vswitch based on given flow template
           and configuration of multistream feature.
        """
        if ('pre_installed_flows' in self._traffic and
                self._traffic['pre_installed_flows'].lower() == 'yes' and
                'multistream' in self._traffic and self._traffic['multistream'] > 0 and
                'stream_type' in self._traffic):
            # multistream feature is enabled and flows should be inserted into OVS
            # so generate flows based on template and multistream configuration
            if self._traffic['stream_type'] == 'L2':
                # iterate through destimation MAC address
                dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value
                for i in range(self._traffic['multistream']):
                    tmp_mac = netaddr.EUI(dst_mac_value + i)
                    tmp_mac.dialect = netaddr.mac_unix_expanded
                    flow_template.update({'dl_dst':tmp_mac})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L3':
                # iterate through destimation IP address
                dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value
                for i in range(self._traffic['multistream']):
                    tmp_ip = netaddr.IPAddress(dst_ip_value + i)
                    flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L4':
                # read transport protocol from configuration and iterate through its destination port
                proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP
                for i in range(self._traffic['multistream']):
                    flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            else:
                self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type'])
            # insert cached flows into the OVS
            self._vswitch.add_flow(bridge, [], cache='flush')
        else:
            self._vswitch.add_flow(bridge, flow_template)
ipaddr.py 文件源码 项目:f5-aws-vpn 作者: f5devcentral 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def slaac(value, query = ''):
    ''' Get the SLAAC address within given network '''
    try:
        vtype = ipaddr(value, 'type')
        if vtype == 'address':
            v = ipaddr(value, 'cidr')
        elif vtype == 'network':
            v = ipaddr(value, 'subnet')

        if v.version != 6:
            return False

        value = netaddr.IPNetwork(v)
    except:
        return False

    if not query:
        return False

    try:
        mac = hwaddr(query, alias = 'slaac')

        eui = netaddr.EUI(mac)
    except:
        return False

    return eui.ipv6(value.network)


# ---- HWaddr / MAC address filters ----
ipaddr.py 文件源码 项目:f5-aws-vpn 作者: f5devcentral 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def hwaddr(value, query = '', alias = 'hwaddr'):
    ''' Check if string is a HW/MAC address and filter it '''

    query_func_extra_args = {
            '': ('value',),
            }
    query_func_map = {
            '': _empty_hwaddr_query,
            'bare': _bare_query,
            'bool': _bool_hwaddr_query,
            'cisco': _cisco_query,
            'eui48': _win_query,
            'linux': _linux_query,
            'pgsql': _postgresql_query,
            'postgresql': _postgresql_query,
            'psql': _postgresql_query,
            'unix': _unix_query,
            'win': _win_query,
            }

    try:
        v = netaddr.EUI(value)
    except:
        if query and query != 'bool':
            raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value)

    extras = []
    for arg in query_func_extra_args.get(query, tuple()):
        extras.append(locals()[arg])
    try:
        return query_func_map[query](v, *extras)
    except KeyError:
        raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query)

    return False
aircracker.py 文件源码 项目:EvilTwinFramework 作者: Esser420 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, id=0, ssid=None, client_mac=None, location=None):
        self.id = id
        self.ssid = ssid
        self.client_mac = client_mac
        self.location = location
        self.client_org = None
        try:
            self.client_org = EUI(self.client_mac).oui.registration().org   # OUI - Organizational Unique Identifier
        except: pass                                                        # OUI not registered exception
aircracker.py 文件源码 项目:EvilTwinFramework 作者: Esser420 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, id=0, ssid=None, bssid=None, date=None, location=None):
        self.id = id
        self.ssid = ssid
        self.bssid = bssid
        self.date = date
        self.location = location
        self.ap_org = None
        try:
            self.ap_org = EUI(self.bssid).oui.registration().org    # OUI - Organizational Unique Identifier
        except: pass
aircracker.py 文件源码 项目:EvilTwinFramework 作者: Esser420 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, id=0, ssid=None, client_mac=None, date=None, location=None):
        self.id = id
        self.ssid = ssid
        self.client_mac = client_mac
        self.date = date
        self.location = location
        self.client_org = None
        try:
            self.client_org = EUI(self.client_mac).oui.registration().org    # OUI - Organizational Unique Identifier
        except: pass
packet.py 文件源码 项目:EvilTwinFramework 作者: Esser420 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_vendor(mac):
        if mac != "":
            maco = EUI(mac)                         # EUI - Extended Unique Identifier
            try:
                return maco.oui.registration().org  # OUI - Organizational Unique Identifier
            except:                                 # OUI not registered exception
                return None
        return None
overrides.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def source(self):
        """
        Source MAC address
        """
        return str(netaddr.EUI(self._src_mac))
overrides.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def source(self, value):
        self._src_mac = netaddr.EUI(value).value
overrides.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def destination(self):
        """
        destination MAC address
        """
        return str(netaddr.EUI(self._dst_mac))
overrides.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def destination(self, value):
        self._dst_mac = netaddr.EUI(value).value
test_protocols.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_traffic_variable(self):
        tx = self.layer.tx
        rx = self.layer.rx
        tx.streams[2].is_enabled = True
        tx.streams[2].save()
        utils.send_and_receive(tx, rx, duration=1, save_as='capture.pcap')
        if utils.is_pypy():
            return
        capture = pyshark.FileCapture('capture.pcap')
        for num_pkt, pkt in enumerate(capture):
            self.assertEqual((num_pkt + 1) * 10, int(pkt.eth.len))
            src = netaddr.EUI(pkt.eth.src).value
            self.assertEqual(num_pkt * 0x100, src)
            dst = netaddr.EUI(pkt.eth.dst).value
            self.assertEqual(0xffffffffffff - num_pkt * 0x1000000, dst)


问题


面经


文章

微信
公众号

扫码关注公众号