def resolve_dns(self, shost):
"""Perform DNS lookups once per file, and cache the results. tested."""
rdns = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"
if (re.match(rdns, shost) == None):
if shost in self.dnscache:
logging.debug("Host %s in DNSCACHE, returned %s", shost, self.dnscache[shost])
shost = self.dnscache[shost]
else:
logging.debug("Host %s not in DNSCACHE", shost)
#try socket lookupt
try:
resolved_ip = socket.gethostbyname(shost)
self.dnscache[shost] = resolved_ip
logging.debug("Resolved %s to %s", shost, resolved_ip)
shost = resolved_ip
except socket.gaierror:
compiler_bailout("Cannot resolve %s" % shost)
return shost
python类gaierror()的实例源码
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def start(self):
if len(self._args.hosts) == 0:
# read from stdin
for line in self.gen_std_line():
search = re.search(strings.REG_IP, line)
_format = self._args.format if self._args.format else '%C %s %c'
if search:
write(line + "\t" + self._format(search.group(0), _format))
else:
write(line)
write("\n")
else:
if self._args.format:
try:
write(self._format(self._args.hosts[0], self._args.format))
except socket.gaierror:
sys.stderr.write("Can't resolve name: " + self._args.hosts[0])
else:
for host in self._args.hosts:
if self._args.detail:
write("\n\n".join(self._get_output(host)))
write("\n")
else:
write("\n".join(self._get_output(host)))
write("\n")
def _connect(self):
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self._broadcast:
# self._sock.bind((self._broadcast_interface, self._port))
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
return self._sock
except socket.gaierror:
error = "Unable to connect to or resolve host: {}".format(
self._host)
log.error(error)
raise IOError(error)
# Push new data to strand
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def request(self, method, url, body, headers):
# Calculate the absolute URI, which fetch requires
netloc = self.host
if self.port:
netloc = '%s:%s' % (self.host, self.port)
absolute_uri = '%s://%s%s' % (self.scheme, netloc, url)
try:
response = fetch(absolute_uri, payload=body, method=method,
headers=headers, allow_truncated=False, follow_redirects=False,
deadline=self.timeout,
validate_certificate=self.validate_certificate)
self.response = ResponseDict(response.headers)
self.response['status'] = str(response.status_code)
self.response['reason'] = httplib.responses.get(response.status_code, 'Ok')
self.response.status = response.status_code
setattr(self.response, 'read', lambda : response.content)
# Make sure the exceptions raised match the exceptions expected.
except InvalidURLError:
raise socket.gaierror('')
except (DownloadError, ResponseTooLargeError, SSLCertificateError):
raise httplib.HTTPException()
def process(input_string):
"""
minimal chat bot
:param input_string:
"""
request.query = input_string
try:
response = request.getresponse().read()
except socket.gaierror:
# if the user is not connected to internet dont give a response
click.echo('Yoda cannot sense the internet right now!')
sys.exit(1)
output = json.loads(response)
answer = output["result"]["fulfillment"]["speech"]
chalk.blue('Yoda speaks:')
click.echo(answer)
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def get_host_ip(hostIP=None):
if hostIP is None or hostIP == 'auto':
hostIP = 'ip'
if hostIP == 'dns':
hostIP = socket.getfqdn()
elif hostIP == 'ip':
from socket import gaierror
try:
hostIP = socket.gethostbyname(socket.getfqdn())
except gaierror:
logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()')
hostIP = socket.gethostbyname(socket.gethostname())
if hostIP.startswith("127."):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# doesn't have to be reachable
s.connect(('10.255.255.255', 1))
hostIP = s.getsockname()[0]
return hostIP
3_2_ping_remote_host.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def ping(self):
"""
Run the ping process
"""
for i in range(self.count):
print ("Ping to %s..." % self.target_host,)
try:
delay = self.ping_once()
except socket.gaierror as e:
print ("Ping failed. (socket error: '%s')" % e[1])
break
if delay == None:
print ("Ping failed. (timeout within %ssec.)" % self.timeout)
else:
delay = delay * 1000
print ("Get pong in %0.4fms" % delay)
def request(self, method, url, body, headers):
# Calculate the absolute URI, which fetch requires
netloc = self.host
if self.port:
netloc = '%s:%s' % (self.host, self.port)
absolute_uri = '%s://%s%s' % (self.scheme, netloc, url)
try:
response = fetch(absolute_uri, payload=body, method=method,
headers=headers, allow_truncated=False, follow_redirects=False,
deadline=self.timeout,
validate_certificate=self.validate_certificate)
self.response = ResponseDict(response.headers)
self.response['status'] = str(response.status_code)
self.response['reason'] = httplib.responses.get(response.status_code, 'Ok')
self.response.status = response.status_code
setattr(self.response, 'read', lambda : response.content)
# Make sure the exceptions raised match the exceptions expected.
except InvalidURLError:
raise socket.gaierror('')
except (DownloadError, ResponseTooLargeError, SSLCertificateError):
raise httplib.HTTPException()
def run():
try:
try:
socket.setdefaulttimeout(float(variables['timeout'][0]))
except ValueError:
printError('invalid timeout')
return ModuleError("invalid timeout")
conn = http.client.HTTPConnection(variables['target'][0])
conn.request("HEAD","/index.html")
res = conn.getresponse()
results = res.getheaders()
print('')
for item in results:
print(colors.yellow+item[0], item[1]+colors.end)
print('')
return results
except http.client.InvalidURL:
printError('invalid url')
return ("invalid url")
except socket.gaierror:
printError('name or service not known')
return ModuleError("name or service not known")
except socket.timeout:
printError('timeout')
return ModuleError("timeout")
def __init__(self, host='', port=0, local_hostname=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""Initialize a new instance.
If specified, `host' is the name of the remote host to which to
connect. If specified, `port' specifies the port to which to connect.
By default, smtplib.SMTP_PORT is used. An SMTPConnectError is raised
if the specified `host' doesn't respond correctly. If specified,
`local_hostname` is used as the FQDN of the local host. By default,
the local hostname is found using socket.getfqdn().
"""
self.timeout = timeout
self.esmtp_features = {}
if host:
(code, msg) = self.connect(host, port)
if code != 220:
raise SMTPConnectError(code, msg)
if local_hostname is not None:
self.local_hostname = local_hostname
else:
# RFC 2821 says we should use the fqdn in the EHLO/HELO verb, and
# if that can't be calculated, that we should use a domain literal
# instead (essentially an encoded IP address like [A.B.C.D]).
fqdn = socket.getfqdn()
if '.' in fqdn:
self.local_hostname = fqdn
else:
# We can't find an fqdn hostname, so use a domain literal
addr = '127.0.0.1'
try:
addr = socket.gethostbyname(socket.gethostname())
except socket.gaierror:
pass
self.local_hostname = '[%s]' % addr
def _safe_gethostbyname(host):
try:
return socket.gethostbyname(host)
except socket.gaierror:
return None
def get_names(self):
if FileHandler.names is None:
try:
FileHandler.names = tuple(
socket.gethostbyname_ex('localhost')[2] +
socket.gethostbyname_ex(socket.gethostname())[2])
except socket.gaierror:
FileHandler.names = (socket.gethostbyname('localhost'),)
return FileHandler.names
# not entirely sure what the rules are here
def start(cdxjFilePath=INDEX_FILE, proxy=None):
hostPort = ipwbConfig.getIPWBReplayConfig()
app.proxy = proxy
if not hostPort:
ipwbConfig.setIPWBReplayConfig(IPWBREPLAY_IP, IPWBREPLAY_PORT)
hostPort = ipwbConfig.getIPWBReplayConfig()
if ipwbConfig.isDaemonAlive():
if cdxjFilePath == INDEX_FILE:
ipwbConfig.firstRun()
ipwbConfig.setIPWBReplayIndexPath(cdxjFilePath)
app.cdxjFilePath = cdxjFilePath
else:
print('Sample data not pulled from IPFS.')
print('Check that the IPFS daemon is running.')
try:
print('IPWB replay started on http://{0}:{1}'.format(
IPWBREPLAY_IP, IPWBREPLAY_PORT
))
app.run(host='0.0.0.0', port=IPWBREPLAY_PORT)
except gaierror:
print('Detected no active Internet connection.')
print('Overriding to use default IP and port configuration.')
app.run()
except socketerror:
print('Address {0}:{1} already in use!'.format(
IPWBREPLAY_IP, IPWBREPLAY_PORT))
sys.exit()
def try_address(fqdn):
"""
Check if the fqdn is valid
Args:
fqdn (str): fully qualified domain name
"""
import socket
try:
socket.gethostbyname_ex(fqdn)
except (socket.gaierror, UnicodeEncodeError):
return False
else:
return True
def __dns_resolve_host(host, ip_version, timeout):
"""
Resolve a host using the system's facilities
"""
family = socket.AF_INET if ip_version == 4 else socket.AF_INET6
def proc(host, family, queue):
try:
queue.put(socket.getaddrinfo(host, 0, family))
except socket.gaierror as ex:
# TODO: Would be nice if we could isolate just the not
# found error.
queue.put([])
except socket.timeout:
# Don't care, we just want the queue to be empty if
# there's an error.
pass
queue = Queue.Queue()
thread = threading.Thread(target=proc, args=(host, family, queue))
thread.setDaemon(True)
thread.start()
try:
results = queue.get(True, timeout)
if len(results) == 0:
return None
family, socktype, proto, canonname, sockaddr = results[0]
except Queue.Empty:
return None
# NOTE: Don't make any attempt to kill the thread, as it will get
# Python all confused if it holds the GIL.
(ip) = sockaddr
return str(ip[0])