ping.py 文件源码

python
阅读 36 收藏 0 点赞 0 评论 0

项目:legos.nettools 作者: Legobot 项目源码 文件源码
def one_ping(self, ip, port, identifier, sequence, ttl, timeout):
        #prepare result dict
        result = {'error': None}

        #create sockets
        ins = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        outs = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        #bind and set timeout for IN socket
        ins.bind(("", port))
        ins.settimeout(timeout)
        #set TTL for OUT socket
        outs.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)

        #create packet and send it
        #print('sending to', ip, 'packet with', identifier, sequence)
        packet = messages.EchoRequest(identifier = identifier, sequence = sequence)
        outs.sendto(packet.pack(), (ip, port))

        #get answer and time it
        start = datetime.datetime.now()
        try:
            s = time.time()
            while time.time() - s < timeout:
                a  = ins.recvfrom(1024)[0]
                ip_header = ip_m.Header(a[:20])
                outp = messages.types[a[20]]()
                outp.unpack(a[20:])
                if (
                        (
                            #handle errors
                            type(outp) in messages.error_messages and
                            #cover not specification complient routers
                            outp.original_message is not None and
                            identifier == outp.original_message.identifier and
                            sequence == outp.original_message.sequence
                        )
                        or
                        (
                            #handle normal responses
                            type(outp) in messages.reply_messages and
                            identifier == outp.identifier and
                            sequence == outp.sequence
                        )
                    ):
                        if type(outp) == messages.EchoReply:
                            self.result['on'] = True
                            delta = datetime.datetime.now() - start
                            self.result['times'].append(delta.seconds * 1000000 + delta.microseconds)
                            self.result['responses'].append((ip_header, outp))
                            break
        except socket.timeout as e:
            self.result['packet_loss'] += 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号