python类ntohl()的实例源码

communication.py 文件源码 项目:Software-Architecture-with-Python 作者: PacktPublishing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def receive(channel):
    """ Receive a message from a channel """

    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''

    buf = ""

    while len(buf) < size:
        buf = channel.recv(size - len(buf))

    return pickle.loads(buf)[0]
wait_for.py 文件源码 项目:tidb-ansible 作者: pingcap 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
wait_for.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
mr_uv_cdn_ip_addr.py 文件源码 项目:nginx_log_parse 作者: daiguadaidai 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reducer_top100(self, _, values):
        """?????"""

        for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])):
            ip_num = -1
            try:
                # ?IP???INT/LONG ??
                ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
                # ?????? ?? DataFrame
                addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & 
                                          (ip_num <= self.ip_addr_df.ip_end_num)]
                # ????????? ??
                addr = addr_df.at[addr_df.index.tolist()[0], 'addr']
                yield cnt, '{ip}    {addr}'.format(ip=ip, addr=addr)
            except:
                yield cnt, ip
xIPGeo-tools.py 文件源码 项目:ipDB 作者: DrizzleRisk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ip2int():
    file_object_read = open('ipDB_0401.txt','r')
    file_object_write = open('ipDB_int_0401.txt','w')

    all_the_text = file_object_read.read()

    # ip2int
    line_arr = all_the_text.split('\n')
    for i in range(len(line_arr)):
        cell = line_arr[i].split(' ')
        cell[0] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[0]))[0]))
        cell[1] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[1]))[0]))
        line_arr[i] = ' '.join(cell)
    all_the_text = '\n'.join(line_arr)    

    file_object_write.write(all_the_text)
    file_object_read.close()
    file_object_write.close()

# ????IP
client.py 文件源码 项目:fascinatedNight 作者: songshixuan 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def user_login(self):
        login = socket.ntohs(1)
        login = struct.pack('h',login)
        action   = socket.ntohs(2)
        peer_id  = socket.ntohl(632949210)
        myid = 632949211
        action   = struct.pack('h', action)
        peer_id  = struct.pack('I', peer_id)
        myid = struct.pack('I',myid)

        print ("user login...")
        msgbody_len = 4
        msgbody_len  = socket.ntohs(msgbody_len)
        msgbody_len  = struct.pack('h', msgbody_len)
        send_login = login + peer_id + msgbody_len + myid
        self.send(send_login)
fields.py 文件源码 项目:scapy-vxlan 作者: p4lang 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
recipe-457669.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)
    size = ntohl(struct.unpack("L", size)[0])
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
        return unmarshall(buf)[0]

# Echo server sample
networkmanayer.py 文件源码 项目:my-weather-indicator 作者: atareao 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def route_to_python(route, family):
        addr, netmask, gateway, metric = route
        return [
            fixups.addr_to_python(addr, family),
            netmask,
            fixups.addr_to_python(gateway, family),
            socket.ntohl(metric)
        ]
fields.py 文件源码 项目:CyberScan 作者: medbenali 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
fields.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
libnetfilter_queue.py 文件源码 项目:packet-queue 作者: google 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def nfq_callback(qh, unused_nfmsg, nfad, unused_data):
  packet = nfq.nfq_get_msg_packet_hdr(nfad).contents
  packet_id = socket.ntohl(packet.packet_id)

  payload_pointer = ctypes.c_void_p()
  size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer))
  payload = ctypes.string_at(payload_pointer, size)

  packet = Packet(packet_id, size, payload, qh)
  py_callbacks[qh](packet)
  return 0

# Maps queue handles to user-specified callbacks.
fields.py 文件源码 项目:CVE-2016-6366 作者: RiskSense-Ops 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
2_3_chat_server_with_select.py 文件源码 项目:Python-Network-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
    return pickle.loads(buf)[0]
1_5_integer_conversion.py 文件源码 项目:Python-Network-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def convert_integer():
    data = 1234
    # 32-bit
    print ("Original: %s => Long  host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
    # 16-bit
    print ("Original: %s => Short  host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
fields.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
WEGOBizMsgCrypt.py 文件源码 项目:wego 作者: wegostudio 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def decrypt(self, text):
        """?????????????
        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception:
            # print e
            return Crypt_DecryptAES_Error, None
        try:
            if PY2:
                pad = ord(plain_text[-1])
            else:
                pad = plain_text[-1]
            # ???????
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:].decode("utf8")
        except Exception:
            return Crypt_IllegalBuffer, None
        if from_appid != self.appid:
            return Crypt_ValidateAppid_Error, None
        return 0, xml_content
network.py 文件源码 项目:heartbreaker 作者: lokori 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle_read(self):
            fromaddr, flags,msg,notif = self.sctp_recv(65536)
            params={'fromaddr':fromaddr,'flags':flags,'ppid':socket.ntohl(notif.ppid)}
            Endpoint.handle_read(self,msg,params)
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
fields.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
fields.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
test_socket.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
test_socket.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
test_socket.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
test_socket.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 96 收藏 0 点赞 0 评论 0
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
fields.py 文件源码 项目:scapy-bpf 作者: guedou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
fields.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
fields.py 文件源码 项目:scapy-radio 作者: BastilleResearch 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
mr_uv_real_ip_addr.py 文件源码 项目:nginx_log_parse 作者: daiguadaidai 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def reducer_top100(self, _, values):
        """?????"""
        for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])):
            ip_num = -1
            try:
                # ?IP???INT/LONG ??
                ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
                # ?????? ?? DataFrame
                addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & 
                                          (ip_num <= self.ip_addr_df.ip_end_num)]
                # ????????? ??
                addr = addr_df.at[addr_df.index.tolist()[0], 'addr']
                yield cnt, '{ip}    {addr}'.format(ip=ip, addr=addr)
            except:
                yield cnt, ip


问题


面经


文章

微信
公众号

扫码关注公众号