def get_iphostname():
'''??linux?????????IP??'''
def get_ip(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipaddr = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24]
)
sock.close()
return ipaddr
try:
ip = get_ip('eth0')
except IOError:
ip = get_ip('eno1')
hostname = socket.gethostname()
return {'hostname': hostname, 'ip':ip}
python类inet_ntoa()的实例源码
def get_iphostname():
'''??linux?????????IP??'''
def get_ip(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipaddr = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24]
)
sock.close()
return ipaddr
try:
ip = get_ip('eth0')
except IOError:
ip = get_ip('eno1')
hostname = socket.gethostname()
return {'hostname': hostname, 'ip':ip}
def zeroconf_info():
"""zeroconf_info returns a list of tuples of the information about other
zeroconf services on the local network. It does this by creating a
zeroconf.ServiceBrowser and spending 0.25 seconds querying the network for
other services."""
ret_info = []
def on_change(zeroconf, service_type, name, state_change):
if state_change is zeroconfig.ServiceStateChange.Added:
info = zeroconf.get_service_info(service_type, name)
if info:
address = "{}".format(socket.inet_ntoa(info.address))
props = str(info.properties.items())
item = ServerInfo(str(info.server), address, info.port, props)
ret_info.append(item)
zc = zeroconfig.Zeroconf()
browser = zeroconfig.ServiceBrowser(
zc, "_defusedivision._tcp.local.", handlers=[on_change])
sleep(1)
concurrency.concurrent(lambda: zc.close())()
return ret_info
def parse_netconn(self, seq, netconn):
parts = netconn.split('|')
new_conn = {}
timestamp = convert_event_time(parts[0])
try:
new_conn['remote_ip'] = socket.inet_ntoa(struct.pack('>i', int(parts[1])))
except:
new_conn['remote_ip'] = '0.0.0.0'
new_conn['remote_port'] = int(parts[2])
new_conn['proto'] = protocols[int(parts[3])]
new_conn['domain'] = parts[4]
if parts[5] == 'true':
new_conn['direction'] = 'Outbound'
else:
new_conn['direction'] = 'Inbound'
return CbNetConnEvent(self.process_model, timestamp, seq, new_conn)
def parse_netconn(self, seq, netconn):
new_conn = {}
timestamp = convert_event_time(netconn.get("timestamp", None))
direction = netconn.get("direction", "true")
if direction == 'true':
new_conn['direction'] = 'Outbound'
else:
new_conn['direction'] = 'Inbound'
for ipfield in ('remote_ip', 'local_ip', 'proxy_ip'):
try:
new_conn[ipfield] = socket.inet_ntoa(struct.pack('>i', int(netconn.get(ipfield, 0))))
except:
new_conn[ipfield] = netconn.get(ipfield, '0.0.0.0')
for portfield in ('remote_port', 'local_port', 'proxy_port'):
new_conn[portfield] = int(netconn.get(portfield, 0))
new_conn['proto'] = protocols.get(int(netconn.get('proto', 0)), "Unknown")
new_conn['domain'] = netconn.get('domain', '')
return CbNetConnEvent(self.process_model, timestamp, seq, new_conn, version=2)
def handle(self):
client_ip = self.client_address[0]
addr = ''
server = ''
try:
sock = self.connection
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
odestdata = sock.getsockopt(socket.SOL_IP, 80, 16)
port, addr_ip = struct.unpack("!xxH4sxxxxxxxx", odestdata)
addr = socket.inet_ntoa(addr_ip)
server = reverse(addr)
print_log('%s connecting %s:%d %d %s' % (client_ip, addr, port, server[0], str(server[1])))
Proxy[server[0]].proxy(sock, server[1], (addr, port))
except socket.error, e:
logging.warn(addr + ':' + str(server) + ':' + str(e))
sock.close()
def map_ip(server, qn, replay):
global HOSTS4, IPMAP
iplist = []
for i in replay.rr:
if i.rtype == 28:
answer = str(i.rdata)
ipbias = 0
if mutex.acquire():
IPMAP.append((server[0], answer))
ipbias = len(IPMAP)
mutex.release()
addr_ip = struct.pack('!I', RESERVEDIP + ipbias)
addr = socket.inet_ntoa(addr_ip)
iplist.append(addr)
HOSTS4[qn] = iplist
return iplist
def getipaddr(self, ifname='eth0'):
import socket
import struct
ret = '127.0.0.1'
try:
ret = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
except:
pass
if ret == '127.0.0.1':
try:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24])
except:
pass
return ret
def _read_socks5_address(self):
atyp = yield self.read_bytes(1)
if atyp == b"\x01":
data = yield self.read_bytes(4)
addr = socket.inet_ntoa(data)
elif atyp == b"\x03":
length = yield self.read_bytes(1)
addr = yield self.read_bytes(length)
elif atyp == b"\x04":
data = yield self.read_bytes(16)
addr = socket.inet_ntop(socket.AF_INET6, data)
else:
raise GeneralProxyError("SOCKS5 proxy server sent invalid data")
data = yield self.read_bytes(2)
port = struct.unpack(">H", data)[0]
raise gen.Return((addr, port))
def hexstr2ip(hex_str, byteorder="little"):
"""
little byte order ????????????
06e6a8c0 -> 192.168.230.6
Args:
hex_str:
Returns:
"""
if byteorder == "little":
fmt = "<L"
else:
fmt = ">L"
return socket.inet_ntoa(struct.pack(fmt, int(hex_str, base=16)))
def get_dnsserver_list():
if os.name == 'nt':
import ctypes
import ctypes.wintypes
DNS_CONFIG_DNS_SERVER_LIST = 6
buf = ctypes.create_string_buffer(2048)
ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
ipcount = struct.unpack('I', buf[0:4])[0]
iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
return iplist
elif os.path.isfile('/etc/resolv.conf'):
with open('/etc/resolv.conf', 'rb') as fp:
return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
else:
logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
return []
def update(self, data):
"""Update info about network interface according to given dnet dictionary"""
self.name = data["name"]
self.description = data['description']
self.win_index = data['win_index']
# Other attributes are optional
if conf.use_winpcapy:
self._update_pcapdata()
try:
self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid']))
except (KeyError, AttributeError, NameError):
pass
try:
self.mac = data['mac']
except KeyError:
pass
def bind_nic():
try:
import fcntl
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
return get_ip_address(nic_name)
except ImportError as e:
print('Indicate nic feature need to be run under Unix based system.')
return '0.0.0.0'
except IOError as e:
print(nic_name + 'is unacceptable !')
return '0.0.0.0'
finally:
return '0.0.0.0'
def dotted_netmask(mask):
"""Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
:rtype: str
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def report_detail(self, ioc, type, result):
events = self.cb.process_events(result["id"], result["segment_id"])
proc = events["process"]
if type == "domain" and proc.has_key("netconn_complete"):
for netconn in proc["netconn_complete"]:
ts, ip, port, proto, domain, dir = netconn.split("|")
if ioc in domain:
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port)
elif type == "ipaddr" and proc.has_key("netconn_complete"):
for netconn in proc["netconn_complete"]:
ts, ip, port, proto, domain, direction = netconn.split("|")
packed_ip = struct.unpack("!i", socket.inet_aton(ioc))[0]
#import code; code.interact(local=locals())
if packed_ip == int(ip):
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port)
elif type == "md5" and proc.has_key("modload_complete"):
for modload in proc["modload_complete"]:
ts, md5, path = modload.split("|")
if ioc in md5:
print "%s\t%s %s" % (ts, md5, path)
if result["process_md5"] == ioc:
print "%s\t%s %s" % (result["start"], result["process_md5"], result["path"])
def outputNetConn(self, proc, netconn):
"""
output a single netconn event from a process document
the caller is responsible for ensuring that the document
meets start time and subnet criteria
"""
# for convenience, use locals for some process metadata fields
hostname = proc.get("hostname", "<unknown>")
process_name = proc.get("process_name", "<unknown>")
user_name = proc.get("username", "<unknown>")
process_md5 = proc.get("process_md5", "<unknown>")
cmdline = proc.get("cmdline", "<unknown>")
path = proc.get("path", "<unknown>")
procstarttime = proc.get("start", "<unknown>")
proclastupdate = proc.get("last_update", "<unknown>")
# split the netconn into component parts
ts, ip, port, proto, domain, dir = netconn.split("|")
# get the dotted-quad string representation of the ip
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
# the underlying data model provides the protocol number
# convert this to human-readable strings (tcp or udp)
if "6" == proto:
proto = "tcp"
elif "17" == proto:
proto = "udp"
# the underlying data model provides a boolean indication as to
# if this is an inbound or outbound network connection
if "true" == dir:
dir = "out"
else:
dir = "in"
# print the record, using pipes as a delimiter
print "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (procstarttime,proclastupdate,hostname, user_name, proto, str_ip, port, dir, domain, process_name, process_md5, path, cmdline)
def dotted_netmask(mask):
"""Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
:rtype: str
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def dotted_netmask(mask):
"""Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
:rtype: str
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def report_detail(self, ioc, type, result):
events = self.cb.process_events(result["id"], result["segment_id"])
proc = events["process"]
if type == "domain" and proc.has_key("netconn_complete"):
for netconn in proc["netconn_complete"]:
ts, ip, port, proto, domain, dir = netconn.split("|")
if ioc in domain:
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port)
elif type == "ipaddr" and proc.has_key("netconn_complete"):
for netconn in proc["netconn_complete"]:
ts, ip, port, proto, domain, direction = netconn.split("|")
packed_ip = struct.unpack("!i", socket.inet_aton(ioc))[0]
#import code; code.interact(local=locals())
if packed_ip == int(ip):
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port)
elif type == "md5" and proc.has_key("modload_complete"):
for modload in proc["modload_complete"]:
ts, md5, path = modload.split("|")
if ioc in md5:
print "%s\t%s %s" % (ts, md5, path)
if result["process_md5"] == ioc:
print "%s\t%s %s" % (result["start"], result["process_md5"], result["path"])
def outputNetConn(self, proc, netconn):
"""
output a single netconn event from a process document
the caller is responsible for ensuring that the document
meets start time and subnet criteria
"""
# for convenience, use locals for some process metadata fields
hostname = proc.get("hostname", "<unknown>")
process_name = proc.get("process_name", "<unknown>")
user_name = proc.get("username", "<unknown>")
process_md5 = proc.get("process_md5", "<unknown>")
cmdline = proc.get("cmdline", "<unknown>")
path = proc.get("path", "<unknown>")
procstarttime = proc.get("start", "<unknown>")
proclastupdate = proc.get("last_update", "<unknown>")
# split the netconn into component parts
ts, ip, port, proto, domain, dir = netconn.split("|")
# get the dotted-quad string representation of the ip
str_ip = socket.inet_ntoa(struct.pack("!i", int(ip)))
# the underlying data model provides the protocol number
# convert this to human-readable strings (tcp or udp)
if "6" == proto:
proto = "tcp"
elif "17" == proto:
proto = "udp"
# the underlying data model provides a boolean indication as to
# if this is an inbound or outbound network connection
if "true" == dir:
dir = "out"
else:
dir = "in"
# print the record, using pipes as a delimiter
print "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (procstarttime,proclastupdate,hostname, user_name, proto, str_ip, port, dir, domain, process_name, process_md5, path, cmdline)
def interface_ip(self):
"""
Returns ascii representation of the ip address of the interface used to communicate with the Cb Response server.
If using NAT, this will be the "internal" IP address of the sensor.
"""
try:
ip_address = socket.inet_ntoa(struct.pack('>i', self._attribute('interface_ip', 0)))
except:
ip_address = self._attribute('interface_ip', 0)
return ip_address
def _read_SOCKS5_address(self, file):
atyp = self._readall(file, 1)
if atyp == b"\x01":
addr = socket.inet_ntoa(self._readall(file, 4))
elif atyp == b"\x03":
length = self._readall(file, 1)
addr = self._readall(file, ord(length))
elif atyp == b"\x04":
addr = socket.inet_ntop(socket.AF_INET6, self._readall(file, 16))
else:
raise GeneralProxyError("SOCKS5 proxy server sent invalid data")
port = struct.unpack(">H", self._readall(file, 2))[0]
return addr, port
def dotted_netmask(mask):
"""Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
:rtype: str
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def inet_ntop(address_family, packed_ip):
if address_family == socket.AF_INET:
return socket.inet_ntoa(packed_ip)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
if address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family')
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1]
def deserialize(byts, protocol_version):
if len(byts) == 16:
return util.inet_ntop(socket.AF_INET6, byts)
else:
# util.inet_pton could also handle, but this is faster
# since we've already determined the AF
return socket.inet_ntoa(byts)
def parse_CAddress(vds):
d = {}
d['nVersion'] = vds.read_int32()
d['nTime'] = vds.read_uint32()
d['nServices'] = vds.read_uint64()
d['pchReserved'] = vds.read_bytes(12)
d['ip'] = socket.inet_ntoa(vds.read_bytes(4))
d['port'] = socket.htons(vds.read_uint16())
return d
def parse_CAddress(vds):
d = {}
d['nVersion'] = vds.read_int32()
d['nTime'] = vds.read_uint32()
d['nServices'] = vds.read_uint64()
d['pchReserved'] = vds.read_bytes(12)
d['ip'] = socket.inet_ntoa(vds.read_bytes(4))
d['port'] = socket.htons(vds.read_uint16())
return d
def reserved_ip(server, qn):
global RESERVEDIP, HOSTS4, IPMAP
ipbias = 0
if mutex.acquire():
IPMAP.append((server[0], qn))
ipbias = len(IPMAP)
mutex.release()
addr_ip = struct.pack('!I', RESERVEDIP + ipbias)
addr = socket.inet_ntoa(addr_ip)
HOSTS4[qn] = [addr]
return addr
def dotted_netmask(mask):
"""
Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def get_local_ip(self):
return socket.inet_ntoa(self.ip)