def discover(data: ConnectionData) -> None:
assert isinstance(data, ConnectionData)
ip_net, iface = data
try:
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_net), iface=iface, timeout=2, verbose=False)
for s, r in ans:
line = r.sprintf("%Ether.src% %ARP.psrc%")
try:
hostname = socket.gethostbyaddr(r.psrc)
line += ' ' + hostname[0]
except socket.herror:
pass
print(line)
except PermissionError:
print('Cannot execute necessary code, did you run as root?')
sys.exit(1)
except:
raise
python类herror()的实例源码
discovery.py 文件源码
项目:SupercomputerInABriefcase
作者: SupercomputerInABriefcase
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def get_ips_from_url(url):
"""
Retrieve IPs from url
:param str url: The url to resolve
:rtype: list
:return: the list of resolved IP address for given url
"""
try:
parsed = urlparse(url)
if parsed.hostname:
socket.setdefaulttimeout(5)
ips = socket.gethostbyname_ex(parsed.hostname)[2]
return ips
except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout):
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
pop = poplib.POP3(ipaddr[0])
pop.user(user[:-1])
pop.pass_(value)
print "\t\nLogin successful:",value, user
print pop.stat()
pop.quit()
work.join()
sys.exit(2)
except(poplib.error_proto, socket.gaierror, socket.error, socket.herror), msg:
#print "An error occurred:", msg
pass
def sendchk(listindex, host, user, password): # seperated function for checking
try:
smtp = smtplib.SMTP(host)
smtp.login(user, password)
code = smtp.ehlo()[0]
if not (200 <= code <= 299):
code = smtp.helo()[0]
if not (200 <= code <= 299):
raise SMTPHeloError(code, resp)
smtp.sendmail(fromaddr, toaddr, message)
print "\n\t[!] Email Sent Successfully:",host, user, password
print "\t[!] Message Sent Successfully\n"
LSstring = host+":"+user+":"+password+"\n"
nList.append(LSstring) # special list for AMS file ID's
LFile = open(output, "a")
LFile.write(LSstring) # save working host/usr/pass to file
LFile.close()
AMSout = open("AMSlist.txt", "a")
AMSout.write("[Server"+str(nList.index(LSstring))+"]\nName="+str(host)+"\nPort=25\nUserID=User\nBccSize=50\nUserName="+str(user)+"\nPassword="+str(password)+"\nAuthType=0\n\n")
smtp.quit()
except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg:
print "[-] Login Failed:", host, user, password
pass
def get_name_from_ip(self, addr): # pylint: disable=no-self-use
"""Returns a reverse dns name if available.
:param addr: IP Address
:type addr: ~.common.Addr
:returns: name or empty string if name cannot be determined
:rtype: str
"""
# If it isn't a private IP, do a reverse DNS lookup
if not common.private_ips_regex.match(addr.get_addr()):
try:
socket.inet_aton(addr.get_addr())
return socket.gethostbyaddr(addr.get_addr())[0]
except (socket.error, socket.herror, socket.timeout):
pass
return ""
def run(self):
#do pings
for x in range(0, self.repeat):
self.one_ping(self.ip, self.port, self.identifier, self.sequence, self.ttl, self.timeout)
self.sequence += 1
if x != self.repeat -1:
time.sleep(self.sleep)
#count packet loss
self.result['packet_loss'] /= self.repeat
#try to get hostname
try:
self.result['hostname'] = socket.gethostbyaddr(self.ip)[0]
except socket.herror:
self.result['hostname'] = None
#calculate averate time
if len(self.result['times']) != 0:
self.result['avg_time'] = sum(self.result['times']) / len(self.result['times'])
#and calculate mdev
mean = sum([float(x) for x in self.result['times']]) / len(self.result['times'])
self.result['mdev'] = sum([abs(x - mean) for x in self.result['times']]) / len(self.result['times'])
return self.result
def scan_and_print_neighbors(net, interface, timeout=1):
global ips_o
print_fmt("\n\033[94m[ARP]\033[0m %s sur %s" % (net, interface))
try:
ans, unans = scapy.layers.l2.arping(net, iface=interface, timeout=timeout, verbose=False)
for s, r in ans.res:
line = r.sprintf("%Ether.src% %ARP.psrc%")
ips_o.append(line.split(' ')[2])
line = mac_address_id(line)
try:
hostname = socket.gethostbyaddr(r.psrc)
line += " " + hostname[0]
except socket.herror:
pass
except KeyboardInterrupt:
print '\033[91m[-]\033[0m L\'utilisateur a choisi l\'interruption du process.'
break
logger.info("\033[96m[ONLINE]\033[0m " + line)
except socket.error as e:
if e.errno == errno.EPERM:
logger.error("\033[91m[-]\033[0m %s. Vous n'etes pas root?", e.strerror)
else:
raise
def whois(url, command=False):
# clean domain to expose netloc
ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
if ip_match:
domain = url
try:
result = socket.gethostbyaddr(url)
except socket.herror as e:
pass
else:
domain = result[0]
else:
domain = extract_domain(url)
if command:
# try native whois command
r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
text = r.stdout.read()
else:
# try builtin client
nic_client = NICClient()
text = nic_client.whois_lookup(None, domain, 0)
return WhoisEntry.load(domain, text)
def whois(url, command=False):
# clean domain to expose netloc
ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
if ip_match:
domain = url
try:
result = socket.gethostbyaddr(url)
except socket.herror as e:
pass
else:
domain = result[0]
else:
domain = extract_domain(url)
if command:
# try native whois command
r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
text = r.stdout.read()
else:
# try builtin client
nic_client = NICClient()
text = nic_client.whois_lookup(None, domain, 0)
return WhoisEntry.load(domain, text)
def get_name_from_ip(self, addr): # pylint: disable=no-self-use
"""Returns a reverse dns name if available.
:param addr: IP Address
:type addr: ~.common.Addr
:returns: name or empty string if name cannot be determined
:rtype: str
"""
# If it isn't a private IP, do a reverse DNS lookup
if not common.private_ips_regex.match(addr.get_addr()):
try:
socket.inet_aton(addr.get_addr())
return socket.gethostbyaddr(addr.get_addr())[0]
except (socket.error, socket.herror, socket.timeout):
pass
return ""
def traceIP(target):
try:
base = GeoIP('GeoLiteCity.dat')
data = base.record_by_addr(target)
dnsName = socket.gethostbyaddr(target)[0]
formatedData = '''IP: {}
City: {}
State/Province: {}
Country: {}
Continent: {}
Zip/Postal code: {}
Timezone: {}
Latitude: {}
Longitude: {}
DNS name: {}'''.format(target, data['city'], data['region_code'], data['country_name'], data['continent'], data['postal_code'], data['time_zone'], str(data['latitude']), str(data['longitude']), dnsName)
print formatedData
# compares target to database and print results to console
askSave = raw_input('Save data? Y/n: ').lower()
if askSave == 'y':
ipFileName = raw_input('Filename: ')
with open(ipFileName, 'w') as fileName:
fileName.write(formatedData)
print 'Output saved as {}'.format(ipFileName)
else:
pass
# asks user if they want to save the output
pause()
main()
except socket.herror:
pass
def _test_vnc(host, port, timeout=3):
"""
Test VNC connection.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
if sock.recv(1024).startswith('RFB'):
return True
except (socket.error, socket.timeout, socket.herror, socket.gaierror) as err:
logger.warning('Error "%s" when testing VNC on "%s:%s"', err, host, port)
finally:
sock.close()
return False
def TCP_connect(ip, port_number, delay, hosts):
host_name = 'Unknown'
TCPsock = socket.socket()
TCPsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
TCPsock.settimeout(delay)
try:
host_name = socket.gethostbyaddr(ip)[0]
# print(host_name)
except socket.herror:
pass
try:
TCPsock.connect((ip, port_number))
if 'SSH' in str(TCPsock.recv(256)):
hosts.append((host_name, ip))
# print(host_name,ip)
except (OSError, ConnectionRefusedError):
pass
def is_aws(ipaddr):
if Validation.check_internet_connection():
if ipaddr in Validation.__aws_cache:
return True
elif ipaddr in Validation.__no_aws_cache:
return False
else: # if ipaddr not in chache
try:
result = socket.gethostbyaddr(ipaddr)
for line in result:
if 'compute.amazonaws.com' in line:
Validation.__aws_cache.append(ipaddr)
return True
except socket.herror:
Validation.__no_aws_cache.append(ipaddr)
return False
def get_local_hostname(self):
"""
Returns the local hostname under which the webinterface can be reached
:return: fully qualified hostname
:rtype: str
"""
import socket
try:
return socket.gethostbyaddr(self.get_local_ip_address())[0] # can fail with default /etc/hosts
except socket.herror:
try:
return socket.gethostbyaddr("127.0.1.1")[0] # in debian based systems hostname is assigned to "127.0.1.1" by default
except socket.herror:
try:
return socket.gethostbyaddr("127.0.0.1")[0] # 'localhost' in most cases
except socket.herror:
return "localhost" # should not happen
def scan_network(self):
logger.info('scanning Network')
network = self._config_reader.get_config_section("Networking")['network']
netmask = self._config_reader.get_config_section("Networking")['netmask']
my_net = ipaddress.ip_network(network+'/'+netmask)
host_list = dict()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2.0)
for ip in my_net:
try:
# print(ip)
host = self._generate_host(ip)
if host is not None:
host_list[ip] = host
except socket.herror as ex:
pass
return host_list
def fill_host_list(self, ip_list):
host_list = dict()
# Use UDP.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2.0)
for ip in ip_list:
try:
ip = ipaddress.ip_address(ip)
host = self._generate_host(ip)
if host is not None:
host_list[ip] = host
except socket.herror as ex:
pass
return host_list
def _generate_host(self, ip):
host = None
# Use UDP. socket.SOCK_DGRAM
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
try:
host_name, alias, ipaddr = socket.gethostbyaddr(str(ip))
host = Node.Node(ip, host_name)
except socket.herror as ex:
pass
return host
# Properties
def lookup_ip_address(self, ip):
"""
Perform a reverse DNS lookup for ip.
Uses self.cache to speed up results when the same ip is
lookup up several times during one session.
:param ip: IP address to look up
"""
if ip is None:
return None
if ip not in self.cache:
try:
self.cache[ip] = gethostbyaddr(ip)[0]
except (herror, gaierror):
# if lookup fails, return the input address
self.cache[ip] = ip
return self.cache[ip]
def hostname(ip):
"""
Performs a DNS reverse lookup for an IP address and caches the result in
a global variable, which is really, really stupid.
:param ip: And IP address string.
:returns: A hostname string or a False value if the lookup failed.
"""
addr = unicode(ip)
if addr in _cached_hostname:
return _cached_hostname[addr]
try:
dns = gethostbyaddr(addr)
except herror:
return False
_cached_hostname[addr] = dns[0]
return dns[0]
def listen():
port = input('port? ')
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
try:
while True:
sock.bind(('',port))
sock.listen(1)
conn,addr = sock.accept()
while True:
sys.stdout.write(conn.recv(4096))
time.sleep(0.005)
except (socket.herror,socket.gaierror,socket.timeout) as e:
print 'could not connect: ',e
except KeyboardInterrupt:
print 'disconnecting'
sock.close()
def telnetbf():
host = raw_input('host or ip to connect to> ')
port = input('port to connect to> ')
pwlist = raw_input('password list> ').strip()
prompt = raw_input('the password prompt (the cue to send the password, usually "Password: ")> ')
incorrect = raw_input('first thing the server says if password is incorrect (cue to disconnect and try another)> ')
def try_pass(pw):
try:
tel = telnetlib.Telnet()
tel.open(host,port)
tel.read_until(prompt,5)
tel.write(pw)
tel.write('\n')
ret = tel.read_until(incorrect,3)
tel.close()
if incorrect in ret:
return False
return True
except (socket.error, socket.herror, socket.gaierror, socket.timeout, telnetlib.EOFError):
return False
zc = zippycrack(try_pass,pwlist,num_threads=4,cont=False)
zc.run()
def ip2name(addr):
if not ip2name.resolve:
return addr
try:
if addr in ip2name.cache:
return ip2name.cache[addr]
# FIXME: Workaround Python bug
# Need double try/except to catch the bug
try:
name = gethostbyaddr(addr)[0]
except KeyboardInterrupt:
raise
except (socket_host_error, ValueError):
name = addr
except (socket_host_error, KeyboardInterrupt, ValueError):
ip2name.resolve = False
name = addr
ip2name.cache[addr] = name
return name
def ip2name(addr):
if not ip2name.resolve:
return addr
try:
if addr in ip2name.cache:
return ip2name.cache[addr]
# FIXME: Workaround Python bug
# Need double try/except to catch the bug
try:
name = gethostbyaddr(addr)[0]
except KeyboardInterrupt:
raise
except (socket_host_error, ValueError):
name = addr
except (socket_host_error, KeyboardInterrupt, ValueError):
ip2name.resolve = False
name = addr
ip2name.cache[addr] = name
return name
def whois(url, command=False):
# clean domain to expose netloc
ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
if ip_match:
domain = url
try:
result = socket.gethostbyaddr(url)
except socket.herror as e:
pass
else:
domain = result[0]
else:
domain = extract_domain(url)
if command:
# try native whois command
r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
text = r.stdout.read()
else:
# try builtin client
nic_client = NICClient()
text = nic_client.whois_lookup(None, domain, 0)
return WhoisEntry.load(domain, text)
def host_cache_lookup(ip):
try:
return host_cache[ip]
except KeyError:
pass
try:
hname = socket.gethostbyaddr(ip)[0]
host_cache[ip] = hname
host_cache_add()
return host_cache[ip]
except socket.herror:
pass
host_cache[ip] = ip
return host_cache[ip]
# Countries we're not allowed to report on (iran, north korea)
def normalize_host(ip):
def resolve_host(ip):
logger = logging.getLogger('resolve_host')
try:
hostname = gethostbyaddr(ip)[0]
except herror as e:
# socket.herror: [Errno 1] Unknown host
logger.error("Unable to resolve %s: %s", ip, e)
return ip
hostname = hostname.split('.')[0]
logger.info("%s is known as %s", ip, hostname)
if not hostname.startswith('ap-'):
return hostname
else:
# ap-s200 -> ap-s*
return re.sub(r'\d+$', '*', hostname)
if ip not in hosts_cache:
hosts_cache[ip] = resolve_host(ip)
return hosts_cache[ip]
def whois(url, command=False):
# clean domain to expose netloc
ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
if ip_match:
domain = url
try:
result = socket.gethostbyaddr(url)
except socket.herror as e:
pass
else:
domain = result[0]
else:
domain = extract_domain(url)
if command:
# try native whois command
r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
text = r.stdout.read()
else:
# try builtin client
nic_client = NICClient()
text = nic_client.whois_lookup(None, domain, 0)
return WhoisEntry.load(domain, text)
def state(self):
"""
:return: The current state of the ip address.
"""
connect = socket.socket()
try:
stat = connect.connect_ex((self.address, 80))
if stat == 0:
return "HOST IS UP"
else:
return "HOST IS DOWN"
except socket.gaierror as gaer:
return gaer.args[1]
except socket.herror as her:
return her.args[1]
except socket.error as err:
return err.args[1]
finally:
connect.shutdown(socket.SHUT_RDWR)
connect.close()
def run(self):
value, user = getword()
try:
print "-"*12
print "User:",user,"Password:",value
smtp = smtplib.SMTP(sys.argv[1])
smtp.login(user, value)
print "\t\nLogin successful:",user, value
smtp.quit()
work.join()
sys.exit(2)
except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg:
#print "An error occurred:", msg
pass