def setUp(self):
'''
To check and install dependencies for the test
'''
smm = SoftwareManager()
pkgs = ["net-tools"]
detected_distro = distro.detect()
if detected_distro.name == "Ubuntu":
pkgs.extend(["openssh-client", "iputils-ping"])
elif detected_distro.name == "SuSE":
pkgs.extend(["openssh", "iputils"])
else:
pkgs.extend(["openssh-clients", "iputils"])
for pkg in pkgs:
if not smm.check_installed(pkg) and not smm.install(pkg):
self.cancel("%s package is need to test" % pkg)
interfaces = netifaces.interfaces()
self.iface = self.params.get("interface")
if self.iface not in interfaces:
self.cancel("%s interface is not available" % self.iface)
self.peer = self.params.get("peer_ip", default="")
if self.peer == "":
self.cancel("peer ip should specify in input")
self.user = self.params.get("user_name", default="root")
python类interfaces()的实例源码
def setUp(self):
"""
Set up.
"""
self.iface = self.params.get("interface", default="")
self.count = self.params.get("count", default="500")
self.peer_ip = self.params.get("peer_ip", default="")
self.drop = self.params.get("drop_accepted", default="10")
# Check if interface exists in the system
interfaces = netifaces.interfaces()
if self.iface not in interfaces:
self.cancel("%s interface is not available" % self.iface)
if not self.peer_ip:
self.cancel("peer ip should specify in input")
# Install needed packages
smm = SoftwareManager()
if not smm.check_installed("tcpdump") and not smm.install("tcpdump"):
self.cancel("Can not install tcpdump")
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_interfaces(ignore_local=True):
"""
Retrieves the address of all external interfaces (lo 127.0.0.1 ignored)
:return: List of tuples (interface, addr)
"""
interfaces = netifaces.interfaces()
if_ext = []
for i in interfaces:
if i == 'lo' and ignore_local:
continue
iface = netifaces.ifaddresses(i).get(netifaces.AF_INET)
if iface:
for j in iface:
if_ext.append((i, j['addr'], j['netmask']))
return if_ext
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)
def get_broadcast_addr_list():
import netifaces
interfaces = [netifaces.ifaddresses(interface)
for interface in netifaces.interfaces()
]
bcast = [af_inet_info['broadcast']
if 'broadcast' in af_inet_info
else af_inet_info['peer']
for interface in interfaces
if netifaces.AF_INET in interface
for af_inet_info in interface[netifaces.AF_INET]
]
print('Broadcast address list:', bcast)
return ' '.join(bcast)
def get_netifaces_addresses():
'''Get a list of addresses + broadcast using netifaces
Yields (address, broadcast_address)
'''
if netifaces is None:
raise RuntimeError('netifaces unavailable')
for iface in netifaces.interfaces():
interface = netifaces.ifaddresses(iface)
if netifaces.AF_INET in interface:
for af_inet_info in interface[netifaces.AF_INET]:
addr = af_inet_info.get('addr', None)
peer = af_inet_info.get('peer', None)
broadcast = af_inet_info.get('broadcast', None)
if addr is not None and broadcast is not None:
yield (addr, broadcast)
elif peer is not None and broadcast is not None:
yield (peer, broadcast)
elif addr is not None:
yield (addr, addr)
elif peer is not None:
yield (peer, peer)
def ip(self, interface_name):
"""
interface: str, Interface Name
return: List
[{'broadcast': '10.100.0.255', 'addr': '10.100.0.164', 'netmask': '255.255.255.0'}]
"""
interface_check = self.interfaces
if not interface_name in interface_check:
raise WrongInterfaceName("Wrong Interface Name %s" % interface_name)
addrs = netifaces.ifaddresses(interface_name)
try:
return addrs[netifaces.AF_INET]
except KeyError:
raise NotValidInterfaceName("Not valid interface name or may be
virtual interface name %s" % interface_name)
def is_up(self, interface_name):
''' Return True if the interface is up, False otherwise. '''
interface_check = self.interfaces
if not interface_name in interface_check:
raise WrongInterfaceName("Wrong Interface Name %s" % interface_name)
ifname = interface_name.encode(encoding='UTF-8')
# Get existing device flags
ifreq = struct.pack('16sh', ifname, 0)
flags = struct.unpack('16sh', fcntl.ioctl(self.sock, SIOCGIFFLAGS, ifreq))[1]
# Set new flags
if flags & IFF_UP:
return True
else:
return False
def netmask(self, interface_name, netmask):
"""
Checking interface first, if interface name found in Get().interfaces()
validating Ipv4. After that applied ip address to interace
interface_name = Applied Interface
netmask = New netmask ip address
"""
interface_check = Get().interfaces
valid_ipv4 = validators.ipv4(netmask)
if not interface_name in interface_check:
raise WrongInterfaceName("Wrong Interface Name %s" % interface_name)
elif not valid_ipv4 is True:
raise NotValidIPv4Address("Not Valid IPv4 Address %s" % netmask)
else:
prefix_len = self.get_net_size(netmask.split('.'))
ifname = interface_name.encode(encoding='UTF-8')
netmask = ctypes.c_uint32(~((2 ** (32 - prefix_len)) - 1)).value
nmbytes = socket.htonl(netmask)
ifreq = struct.pack('16sH2sI8s', ifname, AF_INET, b'\x00'*2, nmbytes, b'\x00'*8)
fcntl.ioctl(self.sock, SIOCSIFNETMASK, ifreq)
def teardown_tunnels(self, tunnels, work_dir, config_file):
"""Function destroys the created tunnels
Args:
tunnels : list of existing tunnels
work_dir : working directory path where the configuration files are located
config_file : name of the honeypot configuration file
"""
logging.debug('Destroying tunnel interfaces.')
config_file_location = os.path.join(work_dir, config_file)
parser = ConfigParser()
parser.read(config_file)
try:
tunnel_id = int(parser.getint("tunnel", "startid"))
except (NoSectionError, NoOptionError):
logger.error('Error: Incomplete honeyd.cfg configuration.')
sys.exit(1)
for i in range(0, len(tunnels)):
tunnel_id += 1
name = 'tun' + str(tunnel_id)
subprocess.Popen(['ip', 'link', 'set', name, 'down'])
subprocess.Popen(['ip', 'tunnel', 'del', name])
for mode in ['ipip', 'ip_gre']:
subprocess.Popen(['modprobe', '-r', mode])
def unhandled_exception(greenlet, expected_args):
"""Function cleans up after a greenlet dies unexpectedly
Args:
greenlet : object that died
expected_args : list of arguments required for clean up
"""
tunnels, config, arp_daemon, web_server = expected_args
logger.error('Error: Stopping honeypot: %s is dead: %s', greenlet, greenlet.exception)
logger.info('Closing tunnel interfaces: %s', tunnels)
Builder().teardown_tunnels(tunnels, package_directory, config)
if arp_daemon:
logging.info('Terminating arpd daemon.')
arp_daemon.kill()
if web_server:
logging.info('Terminating web server.')
web_server.kill()
sys.exit(1)
def build_commands(self):
# get additional options from the config file
mode = self.config.get_collector_custom_data()["interfaces"]["mode"]
ifaces = self.config.get_collector_custom_data()["interfaces"]["interfaces"]
if mode == "inclusive":
self.interfaces = ifaces
else:
self.interfaces = [iface for iface in netifaces.interfaces() if iface not in ifaces]
# build commands
for iface in self.interfaces:
out_file_name = definitions.TIMESTAMP_PLACEHOLDER + "_" + iface
self.output_filenames.append(out_file_name)
out_file_path = os.path.join(self.output_dir, out_file_name + ".pcap")
cmd = "dumpcap " \
+ "-i " + str(iface) + " " \
+ "-w " + str(out_file_path)
self.commands.append(cmd)
def in6_getifaddr():
"""
Returns a list of 3-tuples of the form (addr, scope, iface) where
'addr' is the address of scope 'scope' associated to the interface
'ifcace'.
This is the list of all addresses of all interfaces available on
the system.
"""
ret = []
interfaces = get_if_list()
for i in interfaces:
addrs = netifaces.ifaddresses(i)
if netifaces.AF_INET6 not in addrs:
continue
for a in addrs[netifaces.AF_INET6]:
addr = a['addr'].split('%')[0]
scope = scapy.utils6.in6_getscope(addr)
ret.append((addr, scope, i))
return ret
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)