def __init__(self):
self.typeclass = None
self.strict_level = None
self.header_list = None
self.value_list = None
self.__table_writer = ptw.RstSimpleTableWriter()
self.__table_writer._dp_extractor.type_value_mapping = {
NullString(None).typecode: '``""``',
NoneType(None).typecode: "``None``",
Infinity(None).typecode: '``Decimal("inf")``',
Nan(None).typecode: '``Decimal("nan")``',
}
self.__table_writer._dp_extractor.const_value_mapping = {
True: "``True``",
False: "``False``",
'``"127.0.0.1"``': '``ip_address("127.0.0.1")``',
}
python类ip_address()的实例源码
def make_hosts(region, cell):
# no of hosts need to match ip_address available
no_of_hosts = 2
cab1 = region + "." + cell + "." + "C-1"
cab2 = region + "." + cell + "." + "C-2"
hosts = []
for host in range(no_of_hosts):
hostname = "host%s.%s.example1.com" % (host, cab1)
hosts.append(hostname)
for host in range(no_of_hosts):
hostname = "host%s.%s.example2.com" % (host, cab2)
hosts.append(hostname)
return hosts
def create_netdevice(self, name, device_type):
network_devices_url = self.url + "/network-devices"
payload = {"name": name,
"model_name": "model-x",
"os_version": "version-1",
"device_type": device_type,
"ip_address": "10.10.1.1",
"active": True,
"cloud_id": self.cloud.get("id"),
"region_id": self.region.get("id"),
"cell_id": self.cell.get("id")}
resp = requests.post(network_devices_url, headers=self.headers,
data=json.dumps(payload), verify=False)
if resp.status_code != 201:
raise Exception(resp.text)
return resp.json()
def get_random_load_nonentry():
'''
Return a random item that probably isn't in match_func_result['load'].
'''
match_type = sys.argv[1]
if match_type == 'ipasn':
# Yes, we could do IPv6 here. But the type of the list doesn't matter:
# a random IPv4 might not be in an IPv4 list, and it won't be in an
# IPv6 list
random_32_bit = random.randint(0, 2**32 - 1)
ip = ipaddress.ip_address(random_32_bit)
return ip
else:
char_list = list(get_random_load_entry())
random.shuffle(char_list)
return "".join(char_list)
# try to make sure that other processes don't warp the results too much
def validate_ip_network_address_prefix(address, prefix, strict=True):
'''
If address/prefix is a valid IP network when parsed according to strict,
return it as an ipnetwork object.
Otherwise, return None.
'''
ip_address = validate_ip_address(address)
if ip_address is None or ip_address.version not in [4,6]:
return None
try:
if ip_address.version == 4:
return ipaddress.IPv4Network((ip_address, prefix), strict=strict)
else:
return ipaddress.IPv6Network((ip_address, prefix), strict=strict)
except ValueError:
return None
def private(f):
"""Only allow approved source addresses."""
@functools.wraps(f)
def wrapper(self, request):
if not request.access_route:
# this means something probably bugged in werkzeug, but let's fail
# gracefully
return Response('no client ip provided', status='403')
ip_str = request.access_route[-1]
if isinstance(ip_str, bytes):
ip_str = ip_str.decode('utf8')
ip = ip_address(ip_str)
if ip.is_loopback or any(ip in network for network in get_networks()):
return f(self, request)
else:
msg = PRIVATE_BODY_RESPONSE_TEMPLATE.format(
ip_str,
force_unicode(request.remote_addr),
request.headers.get('x-forwarded-for'))
return Response(msg, status='403')
return wrapper
def set_unset(self, interface_id, attribute, address=None):
"""
Set attribute to True and unset the same attribute for all other
interfaces. This is used for interface options that can only be
set on one engine interface.
"""
for interface in self:
for sub_interface in interface.sub_interfaces():
# Skip VLAN only interfaces (no addresses)
if not isinstance(sub_interface, PhysicalVlanInterface):
if getattr(sub_interface, attribute) is not None:
if sub_interface.nicid == str(interface_id):
if address is not None: # Find IP on Node Interface
if ipaddress.ip_address(bytes_to_unicode(address)) in \
ipaddress.ip_network(sub_interface.network_value):
setattr(sub_interface, attribute, True)
else:
setattr(sub_interface, attribute, False)
else:
setattr(sub_interface, attribute, True)
else: #unset
setattr(sub_interface, attribute, False)
zeroconf_listener.py 文件源码
项目:SupercomputerInABriefcase
作者: SupercomputerInABriefcase
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def add_service(self, zeroconf, type, name):
"""This gets called for services discovered.
Example:
Service SupercomputerInABriefcase on tom-xps._sciabc._tcp.local. added,
service info:
ServiceInfo(
type='_sciabc._tcp.local.',
name='SupercomputerInABriefcase on tom-xps._sciabc._tcp.local.',
address=b'\n\xb7\xcd\xb6',
port=34343,
weight=0,
priority=0,
server='tom-xps.local.',
properties={}
)
"""
info = zeroconf.get_service_info(type, name)
print("Service %s added, service info: %s" % (name, info))
ip = ipaddress.ip_address(info.address)
append_ip_to_nodes_file(str(ip))
def validate_ip(ctx, param, value):
try:
ipaddress.ip_address(value)
return value
except ValueError as ex:
raise click.BadParameter("Invalid IP: %s" % ex)
def add_ban(self, ip, reason, duration, name=None):
"""
Ban an ip with an optional reason and duration in minutes. If duration
is None, ban is permanent.
"""
network = ip_network(text_type(ip), strict=False)
for connection in list(self.connections.values()):
if ip_address(connection.address[0]) in network:
name = connection.name
connection.kick(silent=True)
if duration:
duration = reactor.seconds() + duration * 60
else:
duration = None
self.bans[ip] = (name or '(unknown)', reason, duration)
self.save_bans()
def sort_ip(ip, version="4"):
"""Sort the list by IP address."""
list_length = len(ip)
# Length 10 is list -l, 5 is list
if list_length == 10:
try:
ip = ip[7] if version == "4" else ip[8]
_ip = str(ipaddress.ip_address(ip.rsplit("|")[1]))
except (ValueError, IndexError):
# Lame hack to have "-" last.
_ip = "Z"
elif list_length == 6:
try:
_ip = str(ipaddress.ip_address(ip[5]))
except ValueError:
# Lame hack to have "-" last.
_ip = "Z"
else:
_ip = ip
return _ip
def test_getnextpdu(self):
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=(
ObjectIdentifier(10, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7)),
)
)
encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)
print(response)
n = len(response.values)
value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
def test_getnextpdu_exactmatch(self):
oid = ObjectIdentifier(14, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7, 0, 0, 0, 0))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)
encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)
print(response)
n = len(response.values)
value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
print("test_getnextpdu_exactmatch: ", str(oid))
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
def test_getnextpdu(self):
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=(
ObjectIdentifier(21, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0)),
)
)
encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)
print(response)
n = len(response.values)
value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
def test_getnextpdu_exactmatch(self):
oid = ObjectIdentifier(24, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 17))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)
encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)
print(response)
n = len(response.values)
value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
print("test_getnextpdu_exactmatch: ", str(oid))
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
def reinit_data(self):
"""
Subclass update loopback information
"""
self.loips = {}
self.db_conn.connect(mibs.APPL_DB)
loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*")
if not loopbacks:
return
# collect only ipv4 interfaces
for loopback in loopbacks:
lostr = loopback.decode()
loip = lostr[len("INTF_TABLE:lo:"):]
ipa = ipaddress.ip_address(loip)
if isinstance(ipa, ipaddress.IPv4Address):
self.loips[loip] = ipa
def perform_create(self, serializer, remote_ip):
captcha = (
ipaddress.ip_address(remote_ip) not in ipaddress.IPv6Network(os.environ['DESECSTACK_IPV6_SUBNET'])
and (
User.objects.filter(
created__gte=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS),
registration_remote_ip=remote_ip
).count() >= settings.ABUSE_BY_REMOTE_IP_LIMIT
or
User.objects.filter(
created__gte=timezone.now() - timedelta(hours=settings.ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS),
email__endswith=serializer.validated_data['email'].split('@')[-1]
).count() >= settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT
)
)
user = serializer.save(registration_remote_ip=remote_ip, captcha_required=captcha)
if user.captcha_required:
send_account_lock_email(self.request, user)
elif not user.dyn:
context = {'token': user.get_token()}
send_token_email(context, user)
signals.user_registered.send(sender=self.__class__, user=user, request=self.request)
def get_location_from_geolite2(pool, ip_addr):
try:
ip_addr = ipaddress.ip_address(ip_addr)
# fix issues with asyncpg not handling ip address netmask defaults
# correctly
ip_addr = "{}/{}".format(ip_addr, 32 if ip_addr.version == 4 else 128)
async with pool.acquire() as con:
row = await con.fetchrow(
"SELECT cs.country_iso_code FROM geolite2_ip_addresses ips "
"JOIN geolite2_countries cs ON ips.geoname_id = cs.geoname_id "
"WHERE ips.network >> $1", ip_addr)
return row['country_iso_code'] if row else None
except ValueError:
return None
except asyncpg.exceptions.UndefinedTableError:
log.warning("Missing GeoLite2 database tables")
return None
def test_scandata_parsing(self):
'''Tests parsing of the basic surface scan'''
scan = ndr.NmapScan()
scan.parse_nmap_xml(load_nmap_xml_data(TEST_SURFACE_SCAN_DATA))
self.assertEqual(5, len(scan))
# Got three hosts, find a host by MAC address
host = scan.find_by_mac('84:39:BE:64:3F:E5')[0]
# SHould only find one IP address
self.assertNotEqual(None, host)
self.assertTrue(host.addr, ipaddress.ip_address("192.168.2.100"))
self.assertEqual(host.reason, ndr.NmapReasons.ARP_RESPONSE)
# Confirm that we're not finding phantom hosts
self.assertEqual(scan.find_by_mac('NOT_A_REAL_MAC'), None)
def test_ipv6_linklocal_parsing(self):
'''Tests the results of the IPv6 link-local scan'''
scan = ndr.NmapScan()
scan.parse_nmap_xml(load_nmap_xml_data(TEST_V6_LINK_LOCAL_SCAN_DATA))
self.assertEqual(3, len(scan))
# Got three hosts, find a host by MAC address
host_list = scan.find_by_mac('84:39:BE:64:3F:E5')
# SHould only find one IP address
self.assertEqual(len(host_list), 1)
host = host_list[0]
self.assertTrue(host.reason, ndr.NmapReasons.ND_RESPONSE)
self.assertEqual(host.addr, ipaddress.ip_address("fe80::8639:beff:fe64:3fe5"))
def from_dict(self, config_dict):
'''Load settings from dictionary'''
# Load the easy objects first
if config_dict.get('version', None) != 1:
raise ValueError("Unknown NDR NMAP config file version!")
# Clean out the IP lists
self.mac_address_config = {}
self.ip_address_config = {}
machine_ips = config_dict.get('machine_ips', dict())
machine_macs = config_dict.get('machine_macs', dict())
# Load in the machine IP addresses
for ip_addr, value in machine_ips.items():
self.ip_address_config[ipaddress.ip_address(ip_addr)] = NmapScanMode(value)
# Now do it again with the MAC addresses. When we load in MACs, make them all
# upper case to be consistent with NmapHosts
for mac_addr, value in machine_macs.items():
self.mac_address_config[mac_addr.upper()] = NmapScanMode(value)
def build_nmap_commandline(self, base_flags, address, interface=None):
'''Builds common NMAP option command lines'''
ipaddr = ipaddress.ip_address(address)
# Several bits of magic are required here
# 1. If we're v6 address or range, we need -6
# 2. If we're link-local, we need to specify the interface
options = base_flags
if ipaddr.version == 6:
options = "-6 " + options
if ipaddr.is_link_local:
options = "-e " + interface + " " + options
return options
def from_dict(self, msg_dict):
'''Converts the entry back to dict format'''
self.timestamp = msg_dict['timestamp']
self.proto = ndr.PortProtocols(msg_dict['proto'])
self.src = ipaddress.ip_address(msg_dict['src'])
if msg_dict['srcport'] is not None:
self.srcport = int(msg_dict['srcport'])
self.dst = ipaddress.ip_address(msg_dict['dst'])
if msg_dict['dstport'] is not None:
self.dstport = int(msg_dict['dstport'])
self.ethsrc = msg_dict['ethsrc']
self.ethdst = msg_dict['ethdst']
self.ethlen = int(msg_dict['ethlen'])
self.tcpflags = msg_dict['tcpflags']
if 'tcpseq' is not None:
self.tcpseq = msg_dict['tcpseq']
else:
self.tcpseq = None
def valid_mappable_ipv4(address):
try:
address_uni = unicode(address, errors='ignore')
except TypeError:
address_uni = address
if address_uni in NON_MAPPABLE:
return False
try:
parsed = ipaddress.ip_address(address_uni)
except ValueError:
log.debug('invalid IPv4 address', input=address)
return False
if parsed is not None and parsed.version == 4:
return True
else:
return False
def setUp(self):
network = u'192.168.1.0/30'
self.node = self.fixtures.node()
IpAddrPool().create(
{'network': network, 'autoblock': '192.168.1.1', 'node':
self.node.id})
self.ippool = IPPool.query.get(network)
self.pod = self.fixtures.pod(owner_id=self.user.id)
self.pod_ip = PodIP(
pod_id=self.pod.id,
network=self.ippool.network,
ip_address=int(ipaddress.ip_address(u'192.168.1.2')),
)
self.db.session.add(self.pod_ip)
self.db.session.add(self.node)
self.db.session.commit()
self.stubs = K8SAPIStubs()
self.stubs.node_info_in_k8s_api(self.node.hostname)
self.stubs.node_info_update_in_k8s_api(self.node.hostname)
def setUp(self):
from kubedock.pods.models import PodIP, IPPool
self.ip = ipaddress.ip_address(u'192.168.43.4')
self.with_ip_conf = {
'public_ip': unicode(self.ip),
'containers': [
{'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
{'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
],
}
self.without_ip_conf = {
'public_ip_before_freed': unicode(self.ip),
'containers': [
{'ports': [{'isPublic_before_freed': True},
{'isPublic_before_freed': None},
{'isPublic_before_freed': False}]},
{'ports': [{'isPublic_before_freed': True},
{'isPublic_before_freed': None},
{'isPublic_before_freed': False}]},
],
}
self.pod = self.fixtures.pod(config=json.dumps(self.with_ip_conf))
self.ippool = IPPool(network='192.168.43.0/29').save()
self.podip = PodIP(pod_id=self.pod.id, network=self.ippool.network,
ip_address=int(self.ip)).save()
def free_hosts_and_busy(self, as_int=None, page=None):
pods = PodIP.filter_by(network=self.network)
allocated_ips = {int(pod): pod.get_pod() for pod in pods}
blocked_ips = self.get_blocked_set(as_int=True)
hosts = self.hosts(as_int=True, page=page)
def get_ip_state(ip, pod):
if pod:
return 'busy'
if ip in blocked_ips:
return 'blocked'
return 'free'
def get_ip_info(ip):
pod = allocated_ips.get(ip)
state = get_ip_state(ip, pod)
return ip if as_int else str(ipaddress.ip_address(ip)), pod, state
return [get_ip_info(ip) for ip in hosts]
def get_free_host(cls, as_int=None, node=None, ip=None):
"""Return free host if available.
:param as_int: return ip as long int
:param node: if set, get free host only for this node
:param ip: if set, first try to check if this ip available, if not,
then return any other available ip
:return: ip address, as str or int, depends on as_int value
"""
if ip:
network = cls.get_network_by_ip(ip)
if network and network.is_ip_available(ip, node):
return int(ipaddress.ip_address(ip)) if as_int else ip
if node is None:
networks = cls.all()
else:
networks = cls.filter(cls.node.has(hostname=node))
for n in networks:
free_host = n.get_first_free_host(as_int=as_int)
if free_host is not None:
return free_host
raise NoFreeIPs()
def init():
config = read_config_ini('/run/flannel/subnet.env')
config_network = config['flannel_network']
config_subnet = config['flannel_subnet']
_update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
_update_ipset('kuberdock_nodes', [])
_update_ipset('kuberdock_cluster',
['kuberdock_flannel', 'kuberdock_nodes'],
set_type='list:set')
network = ipaddress.ip_network(unicode(config_network))
subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
etcd = ETCD()
for user in etcd.users():
for pod in etcd.pods(user):
pod_ip = ipaddress.ip_address(pod)
if pod_ip not in network or pod_ip in subnet:
etcd.delete_user(user, pod)
update_ipset()
def get_local_bricks(volume: str) -> Result:
"""
Return all bricks that are being served locally in the volume
volume: Name of the volume to get local bricks for
"""
try:
vol_info = volume_info(volume)
local_ip = get_local_ip()
if local_ip.is_err():
return Err(local_ip.value)
local_brick_list = []
for vol in vol_info:
for brick in vol.bricks:
if ip_address(brick.peer.hostname) == local_ip.value:
local_brick_list.append(brick)
return Ok(local_brick_list)
except GlusterCmdOutputParseError:
raise