def chose_blacklist(self, ip):
"""
Given an IP address, figure out the ipset we have to use.
If the address is an IPv4, we have to use *rig_blacklist4*.
If the address is an IPv6, we have to use *rig_blacklist6*.
Raises ipaddress.AddressValueError if the address is neither
an IPv4 nor an IPv6.
"""
blacklist = 'rig_blacklist{0}'
try:
address = ipaddress.ip_address(ip)
except ipaddress.AddressValueError:
raise
else:
if address.version is 6:
# We don't ban private IPv6:
if address.is_private:
msg = "We don't ban private addresses ({0} given)." \
.format(address)
raise ipaddress.AddressValueError(msg)
else:
# Do we have an embedded IPv4 ?
if address.ipv4_mapped is not None:
address = address.ipv4_mapped
elif address.sixtofour is not None:
address = address.sixtofour
blacklist = blacklist.format(address.version)
return (address, blacklist)
python类AddressValueError()的实例源码
def get_client_ip(request):
ip = get_real_ip(request)
if ip:
try:
interface = ipaddress.IPv6Interface('%s/%i' % (ip, settings.IPV6_PRIVACY_MASK))
except ipaddress.AddressValueError:
interface = ipaddress.IPv4Interface('%s/%i' % (ip, settings.IPV4_PRIVACY_MASK))
return str(interface.network.network_address)
else:
return None
def assertAddressError(self, details, *args):
"""Ensure a clean AddressValueError"""
return self.assertCleanError(ipaddress.AddressValueError,
details, *args)
def testEmbeddedIpv4(self):
ipv4_string = '192.168.0.1'
ipv4 = ipaddress.IPv4Interface(ipv4_string)
v4compat_ipv6 = ipaddress.IPv6Interface('::%s' % ipv4_string)
self.assertEqual(int(v4compat_ipv6.ip), int(ipv4.ip))
v4mapped_ipv6 = ipaddress.IPv6Interface('::ffff:%s' % ipv4_string)
self.assertNotEqual(v4mapped_ipv6.ip, ipv4.ip)
self.assertRaises(ipaddress.AddressValueError, ipaddress.IPv6Interface,
'2001:1.1.1.1:1.1.1.1')
# Issue 67: IPv6 with embedded IPv4 address not recognized.
def __call__(self, value):
try:
import ipaddress
except ImportError:
from gluon.contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value.decode('utf-8'))
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network.decode('utf-8'))
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))
def __call__(self, value):
try:
import ipaddress
except ImportError:
from contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value)
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network)
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))
def __call__(self, value):
try:
import ipaddress
except ImportError:
from gluon.contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value.decode('utf-8'))
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network.decode('utf-8'))
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))
def post(self):
"""Method to create a gateway"""
host = self.args['host']
name = self.args['name']
eui = self.args['eui']
enabled = self.args['enabled']
power = self.args['power']
message = {}
# Check for required args
required = {'host', 'name', 'eui', 'enabled', 'power'}
for r in required:
if self.args[r] is None:
message[r] = "Missing the {} parameter.".format(r)
if message:
abort(400, message=message)
# Ensure we have a valid address
try:
ipaddress.ip_address(host)
except (ipaddress.AddressValueError, ValueError):
message = {'error': "Invalid IP address {} ".format(host)}
abort(400, message=message)
# Ensure we have a valid EUI
if not isinstance(eui, (int, long)):
message = {'error': "Invalid gateway EUI {} ".format(eui)}
abort(400, message=message)
# Check this gateway does not currently exist
exists = yield Gateway.exists(where=['host = ?', host])
if exists:
message = {'error': "Gateway address {} ".format(host) + \
"currently exists."}
abort(400, message=message)
# Check the EUI does not currently exist
exists = yield Gateway.exists(where=['eui = ?', eui])
if exists:
message = {'error': "Gateway EUI {} ".format(eui) + \
"currently exists."}
abort(400, message=message)
# Create and validate
gateway = Gateway(host=host, eui=eui, name=name, enabled=enabled, power=power)
(valid, message) = gateway.valid()
if not valid:
abort(400, message=message)
try:
g = yield gateway.save()
if g is None:
abort(500, message={'error': "Error saving the gateway."})
# Add the new gateway to the server.
self.server.lora.addGateway(g)
location = self.restapi.api.prefix + '/gateway/' + str(host)
returnValue(({}, 201, {'Location': location}))
except TimeoutError:
# Exception returns 500 to client
log.error("REST API timeout for gateway POST request")
def __call__(self, value):
try:
import ipaddress
except ImportError:
from gluon.contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value.decode('utf-8'))
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network.decode('utf-8'))
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))
def __call__(self, value):
try:
import ipaddress
except ImportError:
from gluon.contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value.decode('utf-8'))
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network.decode('utf-8'))
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))
def __call__(self, value):
try:
import ipaddress
except ImportError:
from gluon.contrib import ipaddr as ipaddress
try:
ip = ipaddress.IPv6Address(value)
ok = True
except ipaddress.AddressValueError:
return (value, translate(self.error_message))
if self.subnets:
# iterate through self.subnets to see if value is a member
ok = False
if isinstance(self.subnets, str):
self.subnets = [self.subnets]
for network in self.subnets:
try:
ipnet = ipaddress.IPv6Network(network)
except (ipaddress.NetmaskValueError, ipaddress.AddressValueError):
return (value, translate('invalid subnet provided'))
if ip in ipnet:
ok = True
if self.is_routeable:
self.is_private = False
self.is_link_local = False
self.is_reserved = False
self.is_multicast = False
if not (self.is_private is None or self.is_private ==
ip.is_private):
ok = False
if not (self.is_link_local is None or self.is_link_local ==
ip.is_link_local):
ok = False
if not (self.is_reserved is None or self.is_reserved ==
ip.is_reserved):
ok = False
if not (self.is_multicast is None or self.is_multicast ==
ip.is_multicast):
ok = False
if not (self.is_6to4 is None or self.is_6to4 ==
ip.is_6to4):
ok = False
if not (self.is_teredo is None or self.is_teredo ==
ip.is_teredo):
ok = False
if ok:
return (value, None)
return (value, translate(self.error_message))