def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None):
"""Prepare ports, packets for table 9 related tests.
"""
# Configure ARP packet and stream
if arp_packet:
srcmac = arp_packet[0]['Ether']['src']
arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded))
srcip = arp_packet[1]['ARP']['psrc']
arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset)
else:
raise UIException("ARP packet not supplied")
# set admin status of host and switch to Up, wait till operational status is up
sw_port_id = int(sw_port.split()[1])
sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]]
sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]]
if not sw_instance:
raise UIException("Not found switch id and port pair connection in configuration")
env.tg[1].connect_port(tg_port)
sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up')
sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up")
# Prepare and send stream
arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port,
arp_sa_increment=(1, packet_num + offset),
arp_sip_increment=(1, packet_num + offset))
env.tg[1].start_streams([arp_stream])
time.sleep(ipgap * packet_num)
env.tg[1].stop_streams([arp_stream])
return sw_instances
python类mac_unix_expanded()的实例源码
def process_flow_template(self, bridge, flow_template):
"""Method adds flows into the vswitch based on given flow template
and configuration of multistream feature.
"""
if ('pre_installed_flows' in self._traffic and
self._traffic['pre_installed_flows'].lower() == 'yes' and
'multistream' in self._traffic and self._traffic['multistream'] > 0 and
'stream_type' in self._traffic):
# multistream feature is enabled and flows should be inserted into OVS
# so generate flows based on template and multistream configuration
if self._traffic['stream_type'] == 'L2':
# iterate through destimation MAC address
dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value
for i in range(self._traffic['multistream']):
tmp_mac = netaddr.EUI(dst_mac_value + i)
tmp_mac.dialect = netaddr.mac_unix_expanded
flow_template.update({'dl_dst':tmp_mac})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
elif self._traffic['stream_type'] == 'L3':
# iterate through destimation IP address
dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value
for i in range(self._traffic['multistream']):
tmp_ip = netaddr.IPAddress(dst_ip_value + i)
flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
elif self._traffic['stream_type'] == 'L4':
# read transport protocol from configuration and iterate through its destination port
proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP
for i in range(self._traffic['multistream']):
flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i})
# optimize flow insertion by usage of cache
self._vswitch.add_flow(bridge, flow_template, cache='on')
else:
self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type'])
# insert cached flows into the OVS
self._vswitch.add_flow(bridge, [], cache='flush')
else:
self._vswitch.add_flow(bridge, flow_template)
def __init__(self, api, router, port, mac, networks,
peer=None, may_exist=False, **columns):
self.mac = str(netaddr.EUI(mac, dialect=netaddr.mac_unix_expanded))
self.networks = [str(netaddr.IPNetwork(net)) for net in networks]
self.router = router
self.port = port
self.peer = peer
self.may_exist = may_exist
self.columns = columns
super(LrpAddCommand, self).__init__(api)
def __init__(self, api, router, nat_type, external_ip, logical_ip,
logical_port=None, external_mac=None, may_exist=False):
if nat_type not in const.NAT_TYPES:
raise TypeError("nat_type not in %s" % str(const.NAT_TYPES))
external_ip = str(netaddr.IPAddress(external_ip))
if nat_type == const.NAT_DNAT:
logical_ip = str(netaddr.IPAddress(logical_ip))
else:
net = netaddr.IPNetwork(logical_ip)
logical_ip = str(net.ip if net.prefixlen == 32 else net)
if (logical_port is None) != (external_mac is None):
msg = "logical_port and external_mac must be passed together"
raise TypeError(msg)
if logical_port and nat_type != const.NAT_BOTH:
msg = "logical_port/external_mac only valid for %s" % (
const.NAT_BOTH,)
raise TypeError(msg)
if external_mac:
external_mac = str(
netaddr.EUI(external_mac, dialect=netaddr.mac_unix_expanded))
super(LrNatAddCommand, self).__init__(api)
self.router = router
self.nat_type = nat_type
self.external_ip = external_ip
self.logical_ip = logical_ip
self.logical_port = logical_port or []
self.external_mac = external_mac or []
self.may_exist = may_exist
def arp_packets_sending(self, env, tg_port, sw_port, packet_num, ipgap, arp_packet=None, offset=0, retry=False):
cmd = "match -f 5555 -p 30001 get_rules table 9"
sw_t9 = {}
sw_instances = self.table_9_test_preparation(env, tg_port, sw_port, ipgap=ipgap,
packet_num=packet_num, arp_packet=arp_packet,
offset=offset)
# wait for table 9 to be populated
time.sleep(5)
# check table 9 entries
assert sw_instances, "No end point ports found connected to the host"
for switch in sw_instances:
output = switch.ui.cli_send_command(cmd).stdout
sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE))
if not retry:
return sw_t9
failed = False
for switch, count in sw_t9.items():
if count != packet_num:
failed = True
break
if not failed:
return sw_t9
# in case not all entries are learned fast, try add them again
arp_packet = deepcopy(arp_packet)
srcmac = arp_packet[0]['Ether']['src']
srcip = arp_packet[1]['ARP']['psrc']
count = 2
while failed and count:
count -= 1
self.class_logger.info("Adding not learned MAC addresses to the table 9")
sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]]
for switch in sw_instances:
output = switch.ui.cli_send_command("match -f 5555 -p 30001 get_rules table 9").stdout
learned = re.findall(r'ethernet.dst_mac\s=\s((?:[\w]{2}:){5}[\w]{2})', output, re.MULTILINE)
to_add = [el for el in [str(EUI(EUI(srcmac).value + ix, dialect=mac_unix_expanded))
for ix in range(packet_num)] if el not in learned]
self.class_logger.info("{} entries are missing for switch {}".format(len(to_add), switch.name))
for mac in to_add:
diff = EUI(mac).value - EUI(srcmac).value
arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + diff, dialect=mac_unix_expanded))
arp_packet[1]['ARP']['psrc'] = str(IPAddress(IPAddress(srcip).value + diff))
# Prepare and send stream
arp_stream = env.tg[1].set_stream(arp_packet, count=1, iface=tg_port)
env.tg[1].send_stream(arp_stream)
self.class_logger.info("Verify table#9 entries")
for switch in sw_instances:
output = switch.ui.cli_send_command(cmd).stdout
if packet_num == len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)):
failed = False
else:
failed = True
time.sleep(3)
break
for switch in sw_instances:
output = switch.ui.cli_send_command(cmd).stdout
sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE))
return sw_t9
def _expand_vm_settings(self, key, vm_number):
"""
Expand VM option with given key for given number of VMs
"""
tmp_value = self.getValue(key)
if isinstance(tmp_value, str):
scalar = True
master_value = tmp_value
tmp_value = [tmp_value]
else:
scalar = False
master_value = tmp_value[0]
master_value_str = str(master_value)
if master_value_str.find('#') >= 0:
self.__dict__[key] = []
for vmindex in range(vm_number):
value = master_value_str.replace('#VMINDEX', str(vmindex))
for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
multi = int(step) if len(step) and int(step) else 1
if macro == '#EVAL':
# pylint: disable=eval-used
tmp_result = str(eval(param))
elif macro == '#MAC':
mac_value = netaddr.EUI(param).value
mac = netaddr.EUI(mac_value + vmindex * multi)
mac.dialect = netaddr.mac_unix_expanded
tmp_result = str(mac)
elif macro == '#IP':
ip_value = netaddr.IPAddress(param).value
tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
else:
raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))
value = value.replace("{}{}".format(macro, args), tmp_result)
# retype value to original type if needed
if not isinstance(master_value, str):
value = ast.literal_eval(value)
self.__dict__[key].append(value)
else:
for vmindex in range(len(tmp_value), vm_number):
self.__dict__[key].append(master_value)
if scalar:
self.__dict__[key] = self.__dict__[key][0]
_LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
def _parse_mac_address_table_output(self, output, ignore_port=None):
"""MAC Address Table parsing.
Parses the output of 'sh mac address-table' command.
:param output: the output of the command
Mac Address Table
-------------------------------------------
Vlan Mac Address Type Ports
---- ----------- -------- -----
All 0100.0ccc.cccc STATIC CPU
<snip>
All ffff.ffff.ffff STATIC CPU
601 000b.7866.5240 DYNAMIC Gi1/0/48
601 0013.20fe.56b4 DYNAMIC Gi1/0/35
<snip>
Total Mac Addresses for this criterion: 59
:param ignore_port: ignore this port
:return: list of dicts containing device connection metrics
"""
lookup = []
lines = [line.strip() for line in output.splitlines()]
lines.append('')
while len(lines) > 0:
line = lines.pop(0)
# Table entries will always have a '.' for the MAC address.
# If there isn't one it's not a row we care about.
if line.find('.') < 0:
continue
values = [entry for entry in line.split() if entry]
# Ignore non-physical ports
port_types = self.PORT_NOTATION.values()
if all(values[3].lower().find(port.lower()) != 0
for port in port_types):
continue
# If the ignore_port is specified and is the port in question,
# ignore it.
ignore_port = self._shorthand_port_notation(ignore_port)
if ignore_port and values[3] == ignore_port:
continue
lookup.append(
{
'mac': EUI(values[1], dialect=mac_unix_expanded),
'interface': self._shorthand_port_notation(values[3])
}
)
return sorted(lookup, key=lambda k: k['interface'])
def _parse_ipdt_output(self, output):
"""IPDT output parsing.
Parses the output of the `show ip device track` command.
:param output: the output of the command
IP Device Tracking = Enabled
IP Device Tracking Probe Count = 3
IP Device Tracking Probe Interval = 30
IP Device Tracking Probe Delay Interval = 0
-------------------------------------------------------------------
IP Address MAC Address Vlan Interface STATE
-------------------------------------------------------------------
192.168.1.12 6cec.eb68.c86f 601 GigabitEthernet1/0/14 ACTIVE
192.168.1.15 6cec.eb67.836c 601 GigabitEthernet1/0/12 ACTIVE
<snip>
Total number interfaces enabled: 47
Enabled interfaces:
Gi1/0/1, Gi1/0/2, Gi1/0/3, Gi1/0/4, Gi1/0/5, Gi1/0/6, Gi1/0/7,
<snip>
:return: list of dicts containing device connection metrics
"""
lookup = []
lines = [line.strip() for line in output.splitlines()]
lines.append('')
while len(lines) > 0:
line = lines.pop(0)
# Table entries will always have a '.' for the MAC address.
# If there isn't one it's not a row we care about.
if line.find('.') < 0:
continue
values = [entry for entry in line.split() if entry]
# Ignore any 'INACTIVE' entries, meaning that there hasn't been
# traffic from that MAC Address is has likely been unplugged.
if values[4] == 'INACTIVE':
continue
lookup.append(
{
'ip': values[0],
'mac': EUI(values[1], dialect=mac_unix_expanded),
'interface': self._shorthand_port_notation(values[3])
}
)
return sorted(lookup, key=lambda k: k['interface'])