def ping_once(self):
"""
Returns the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as e:
if e.errno == 1:
# Not superuser, so operation not permitted
e.msg += "ICMP messages can only be sent from root user processes"
raise socket.error(e.msg)
except Exception as e:
print ("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
python类getprotobyname()的实例源码
3_2_ping_remote_host.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def getprotobyname(name):
"""Look up a protocol number by name. (Rarely used.)
Like :func:`socket.getprotobyname`, but async.
"""
return await run_sync_in_worker_thread(
_stdlib_socket.getprotobyname, name, cancellable=True
)
# obsolete gethostbyname etc. intentionally omitted
# likewise for create_connection (use open_tcp_stream instead)
################################################################
# Socket "constructors"
################################################################
def _create_socket(self):
try:
socket.inet_pton(socket.AF_INET, self.dest)
dest_ip = self.dest
except socket.error:
try:
dest_ip = socket.gethostbyname(self.dest)
except socket.gaierror:
self.ret_code = EXIT_STATUS.ERROR_HOST_NOT_FOUND
return
self.dest_ip = dest_ip
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
socket.getprotobyname("icmp"))
except socket.error as e:
if e.errno == 1:
self.ret_code = EXIT_STATUS.ERROR_ROOT_REQUIRED
else:
self.ret_code = EXIT_STATUS.ERROR_CANT_OPEN_SOCKET
return
def ping_once(self):
"""
Returns the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error, (errno, msg):
if errno == 1:
# Not superuser, so operation not permitted
msg += "ICMP messages can only be sent from root user processes"
raise socket.error(msg)
except Exception, e:
print "Exception: %s" %(e)
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
def do_one(dest_addr, timeout):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as xxx_todo_changeme:
(errno, msg) = xxx_todo_changeme.args
if errno == 1:
# Operation not permitted
msg = msg + (
" - Note that ICMP messages can only be sent from processes"
" running as root."
)
raise socket.error(msg)
raise # raise the original error
my_ID = os.getpid() & 0xFFFF
send_one_ping(my_socket, dest_addr, my_ID)
delay = receive_one_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
def do_one(dest_addr, timeout):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as xxx_todo_changeme:
(errno, msg) = xxx_todo_changeme.args
if errno == 1:
# Operation not permitted
msg = msg + (
" - Note that ICMP messages can only be sent from processes"
" running as root."
)
raise socket.error(msg)
raise # raise the original error
my_ID = os.getpid() & 0xFFFF
send_one_ping(my_socket, dest_addr, my_ID)
delay = receive_one_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
def ping_once(self, dest_addr, timeout, sequence):
icmp = socket.getprotobyname('icmp')
try:
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error, (errno, msg):
if errno == 1:
msg = '%s : Just root can send ICMP Message' % msg
raise socket.error(msg)
raise
packet_id = os.getpid() & 0xFFFF
self.__send_icmp_request(icmp_socket, dest_addr, packet_id, sequence)
delay = self.__receive_icmp_response(icmp_socket, packet_id, timeout)
icmp_socket.close()
return delay
def ping_once(ip_addr, timeout):
"""
return either delay (in second) or none on timeout.
"""
# Translate an Internet protocol name to a constant suitable for
# passing as the (optional) third argument to the socket() function.
# This is usually only needed for sockets opened in “raw” mode.
icmp = socket.getprotobyname('icmp')
try:
# socket.socket([family[, type[, proto]]])
# Create a new socket using the given address family(default: AF_INET),
# socket type(SOCK_STREAM) and protocol number(zero or may be omitted).
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error:
raise
# Return the current process id.
# int: 0xFFFF = -1, unsigned int: 65535
my_ID = os.getpid() & 0xFFFF
send_ping(my_socket, ip_addr, my_ID)
delay = receive_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
def make_sockets():
"""Makes and returns the raw IPv6 and IPv4 ICMP sockets.
This needs to run as root before dropping privileges.
"""
try:
socketv6 = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname('ipv6-icmp'))
except Exception:
LOGGER.error("Could not create v6 socket")
raise
try:
socketv4 = socket.socket(socket.AF_INET, socket.SOCK_RAW,
socket.getprotobyname('icmp'))
except Exception:
LOGGER.error("Could not create v6 socket")
raise
return [socketv6, socketv4]
def do_one(dest_addr, timeout):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as xxx_todo_changeme:
(errno, msg) = xxx_todo_changeme.args
if errno == 1:
# Operation not permitted
msg = msg + (
" - Note that ICMP messages can only be sent from processes"
" running as root."
)
raise socket.error(msg)
raise # raise the original error
my_ID = os.getpid() & 0xFFFF
send_one_ping(my_socket, dest_addr, my_ID)
delay = receive_one_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
03_02_ping_remote_host.py 文件源码
项目:011_python_network_programming_cookbook_demo
作者: jerry-0824
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def ping_once(self):
"""
Returns the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error, (errno, msg):
if errno == 1:
# Not superuser, so operation not permmited
msg += "ICMP messages can only be sent from root user processes"
raise socket.error(msg)
except Exception, e:
print "Exception: %s" %(e)
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
ping_class.py 文件源码
项目:Automation-Framework-for-devices
作者: tok-gogogo
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def do_one(dest_addr, timeout):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error, (errno, msg):
if errno == 1:
# Operation not permitted
msg = msg + (
" - Note that ICMP messages can only be sent from processes"
" running as root."
)
raise socket.error(msg)
raise # raise the original error
my_ID = os.getpid() & 0xFFFF
send_one_ping(my_socket, dest_addr, my_ID)
delay = receive_one_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = bytearray()
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append('\x00')
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def ping(host, timeout=2, ping_id=None, udp=False):
"""Ping remote host.
:param str host: Host name/address.
:param float timeout: timeout.
:param int ping_id: 16 bit integer to identify packet.
"""
dest_addr = socket.gethostbyname(host)
icmp = socket.getprotobyname('icmp')
socket_type = socket.SOCK_DGRAM if udp else socket.SOCK_RAW
raw_socket = socket.socket(socket.AF_INET, socket_type, icmp)
ping_id = os.getpid() if ping_id is None else ping_id
ping_id &= 0xffff
seq_code = random.randint(1, 65535)
latency = None
start_ts = time.time()
end_ts = start_ts + timeout
send_ping(raw_socket, dest_addr, ping_id, seq_code)
while time.time() < end_ts:
r_ping_id, r_seq_code, r_recv_ts = receive_reply(raw_socket, timeout)
if ping_id == r_ping_id and r_seq_code == seq_code:
latency = r_recv_ts - start_ts
break
return latency
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append('\x00')
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append('\x00')
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def ping(addr):
"""Send echo request (ping) to addr and get reply"""
dest = ''
try:
dest = socket.gethostbyname(addr)
except socket.gaierror:
print('unknown host: {}'.format(addr))
return
print('PING {} ({})'.format(addr, dest))
icmp_header = make_ping_request()
# prepare payload
icmp_header_bytes = BytesIO()
icmp_header_bytes.write(icmp_header)
icmp_header_bytes.seek(0)
full_payload = icmp_header_bytes.read()
print('request: {}'.format(full_payload.hex() if PY3 else full_payload.encode('hex')))
response = ''
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp'))
s.settimeout(1)
start_time = time.time()
s.sendto(full_payload, (dest, 0))
response, _ = s.recvfrom(256)
end_time = time.time()
except Exception as ex:
print(ex)
return
finally:
if s:
s.close()
if not response:
return
print('response: {}'.format(response.hex() if PY3 else response.encode('hex')))
print('time = {:.2f} ms'.format((end_time - start_time) * 1000))
# read IPv4 header
response_bytes = BytesIO(response)
ipv4_header = IPv4Header()
response_bytes.readinto(ipv4_header)
# read ICMP header
icmp_echo_header = ICMPEchoHeader()
response_bytes.readinto(icmp_echo_header)
print('=====================================================')
print('IPv4 header')
print('=====================================================')
print(ipv4_header)
print('=====================================================')
print('ICMP Echo header')
print('=====================================================')
print(icmp_echo_header)
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = bytearray()
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def parse_args(self):
(options, args) = OptionParser.parse_args(self)
try:
options.proto_num = socket.getprotobyname(options.protocol)
except socket.error:
exit("Unsupported protocol " + options.protocol)
for f in ('bind_address','listen_address','connect_address'):
setattr(options,f,addrparse(getattr(options,f,None)))
for f in ('starttime','stoptime'):
val=getattr(options,f,None)
try:
setattr(options,f,float(val))
continue
except ValueError:
pass
try:
ts=datetime.datetime.now()
t=datetime.datetime.strptime(val,'%H:%M:%S')
ts=ts.replace(hour=t.hour,minute=t.minute,second=t.second)
setattr(options,f,float(ts.strftime("%Y%m%d%H%M%S")))
except ValueError:
exit("Invalid time %s" % val)
return options, args
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = bytearray()
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append('\x00')
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
WKS.py 文件源码
项目:Infrax-as-Code-1000-webservers-in-40-minutes
作者: ezeeetm
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = bytearray()
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def __icmpSocket(self):
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
return Sock
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
protocol = int(protocol)
else:
protocol = socket.getprotobyname(protocol)
bitmap = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
if protocol == _proto_udp:
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
bitmap.append('\x00')
bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)