def setup_sockets(self):
self.sockets = {}
ip_addresses = get_interface_addresses(self.logger)
self.ip_addresses = ip_addresses
multi_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
multi_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self.ttl)
for ip in ip_addresses:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
mreq=socket.inet_aton(self.address)+socket.inet_aton(ip)
multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.logger.info("Regestering multicast for: %s: %s"%(self.address, ip))
sock.bind((ip, self.port))
self.sockets[ip] = sock
multi_sock.bind(("", self.port))
self.socks = [self.sockets[x] for x in self.sockets.keys()]
self.multi_sock = multi_sock
python类inet_aton()的实例源码
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def addressInNetwork(self, ip, cidr):
# the ip can be the emtpy string ('') in cases where the connection
# is made via a web proxy. in these cases the sensor cannot report
# the true remote IP as DNS resolution happens on the web proxy (and
# not the endpoint)
if '' == ip:
return False
try:
net = cidr.split('/')[0]
bits = cidr.split('/')[1]
if int(ip) > 0:
ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0]
else:
ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0]
netaddr = struct.unpack('<L', socket.inet_aton(net))[0]
netmask = ((1L << int(bits)) - 1)
return ipaddr & netmask == netaddr & netmask
except:
return False
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def addressInNetwork(self, ip, cidr):
# the ip can be the emtpy string ('') in cases where the connection
# is made via a web proxy. in these cases the sensor cannot report
# the true remote IP as DNS resolution happens on the web proxy (and
# not the endpoint)
if '' == ip:
return False
try:
net = cidr.split('/')[0]
bits = cidr.split('/')[1]
if int(ip) > 0:
ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0]
else:
ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0]
netaddr = struct.unpack('<L', socket.inet_aton(net))[0]
netmask = ((1L << int(bits)) - 1)
return ipaddr & netmask == netaddr & netmask
except:
return False
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def consume(self, doc, _):
"""
Find entities in documents matching compiled regular expression.
:param doc: Document object.
:type doc: ``gransk.core.document.Document``
"""
if not doc.text:
return
entities = doc.entities
for result in self.pattern.finditer(doc.text):
entity_value = result.group(result.lastgroup)
if result.lastgroup == 'ip_addr':
try:
socket.inet_aton(entity_value)
except socket.error:
continue
entities.add(
result.start(result.lastgroup), result.lastgroup, entity_value)
def inet_pton(address_family, ip_string):
if address_family == socket.AF_INET:
return socket.inet_aton(ip_string)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def __init__(self, server=DEFAULT_SERVER, port=DEFAULT_PORT, local_ip=None, debug=False, ip_forward=False):
self.server = server
self.port = port
self.uid = b'test'
self.ip = None
self.debug = debug
if ip_forward:
self.ip = self.server
else:
if local_ip is not None:
self.ip = local_ip
else:
# only available for dormitory subnet
import IpFinder
self.ip = IpFinder.get_ip_startswith('10.21.') or IpFinder.get_ip_startswith('10.20.')
assert ip_forward or self.ip is not None, 'Can not find a correct local ip address. \
Please specify the IP address thought command-line argument using --ip'
self.ip = socket.inet_aton(self.ip)
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def to_raw(self, host, port):
#pylint:disable=arguments-differ
'''
:param ip: string representing the ip address or domain name to connect back to
:param port: port to connect to on the remote host
'''
l.debug("Connecting back to %s:%d", host, port)
target_ip = socket.gethostbyname(host)
raw_ip = socket.inet_aton(target_ip).encode('hex')
if port < 0 or port >= 65535:
raise ValueError("invalid port specified")
raw_port = struct.pack("!H", port).encode('hex')
return (self.hex_code % (raw_port, raw_ip)).decode('hex')
def to_raw(self, host, port):
#pylint:disable=arguments-differ
'''
:param ip: string representing the ip address or domain name to connect back to
:param port: port to connect to on the remote host
'''
l.debug("Connecting back to %s:%d", host, port)
target_ip = socket.gethostbyname(host)
raw_ip = socket.inet_aton(target_ip).encode('hex')
if port < 0 or port >= 65535:
raise ValueError("invalid port specified")
raw_port = struct.pack("!H", port).encode('hex')
return (self.hex_code % (raw_ip, raw_port)).decode('hex')
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""
Very simple check of the cidr format in no_proxy variable.
:rtype: bool
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def ipv4(address):
address = address.replace("http://", "").replace("https://", "")
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError:
try:
socket.inet_aton(address)
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
if address.count('.') == 3:
return address
else:
raise OptionValidationError("Option have to be valid IP address.")
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
return address
def is_ipv4(value):
"""Utility function to detect if a value is a valid IPv4
:param value: The value to match against.
:return: True if the value is a valid IPv4.
"""
try:
socket.inet_pton(socket.AF_INET, value)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(value)
except socket.error:
return False
return value.count('.') == 3
except socket.error: # not a valid address
return False
return True
def is_valid_ipv4_address(address):
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(address)
except socket.error:
return False
return address.count('.') == 3
except socket.error: # not a valid address
return False
return True
# ??????? ??? ??????????? ?????????? ???????????? ? OID. ??????????????? ???????? '%s' ?? ????????? ? ??????? ????? '0' (?????? ????????)
# ????? ??????? ???????? ??? {?} ?? ??????? ? ?? ??????? ? ???????
# ??????, ??? ?????????? ?????????????? ??????? ? ??? ??? ??????? ??? ????, ?? ???? ??????? ????? ?????????? :)
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def addUTMPEntry(self, loggedIn=1):
if not utmp:
return
ipAddress = self.avatar.conn.transport.transport.getPeer().host
packedIp ,= struct.unpack('L', socket.inet_aton(ipAddress))
ttyName = self.ptyTuple[2][5:]
t = time.time()
t1 = int(t)
t2 = int((t-t1) * 1e6)
entry = utmp.UtmpEntry()
entry.ut_type = loggedIn and utmp.USER_PROCESS or utmp.DEAD_PROCESS
entry.ut_pid = self.pty.pid
entry.ut_line = ttyName
entry.ut_id = ttyName[-4:]
entry.ut_tv = (t1,t2)
if loggedIn:
entry.ut_user = self.avatar.username
entry.ut_host = socket.gethostbyaddr(ipAddress)[0]
entry.ut_addr_v6 = (packedIp, 0, 0, 0)
a = utmp.UtmpRecord(utmp.UTMP_FILE)
a.pututline(entry)
a.endutent()
b = utmp.UtmpRecord(utmp.WTMP_FILE)
b.pututline(entry)
b.endutent()
def test_simple(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 34)
+ socket.inet_aton('1.2.3.4'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)
self.assert_(self.sock.driver_outgoing is not None)
# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')
# the other way around
self.sock.driver_outgoing.dataReceived('hi there')
self.assertEqual(self.sock.transport.value(), 'hi there')
self.sock.connectionLost('fake reason')
def test_eof_remote(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')
# now close it from the server side
self.sock.driver_outgoing.transport.loseConnection()
self.sock.driver_outgoing.connectionLost('fake reason')
def test_eof_local(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')
# now close it from the client side
self.sock.connectionLost('fake reason')
def test_bad_source(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
# connect from WRONG address
incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
self.assertIdentical(incoming, None)
# Now we should have the second reply packet and it should
# be a failure. The connection should be closing.
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 91, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(self.sock.transport.stringTCPTransport_closing)