python类pack()的实例源码

nullfree.py 文件源码 项目:shellgen 作者: MarioVilas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def encode_8(bytes, key, terminator):
        """
        Encode the bytecode with the given 8-bit XOR key.

        :type  bytes: str
        :param bytes: Bytecode to encode.

        :type  key: str
        :param key: 8-bit XOR key.

        :type  terminator: str
        :param terminator: 8-bit terminator.

        :rtype:  str
        :return: Encoded bytecode.
        """
        if not bytes.endswith(terminator):
            bytes += terminator
        fmt = "B" * len(bytes)
        unpack = struct.unpack
        pad = unpack("B", key) * len(bytes)
        bytes = unpack(fmt, bytes)
        bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ]
        return struct.pack(fmt, *bytes)
smbus.py 文件源码 项目:Adafruit_Python_PureIO 作者: adafruit 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def process_call(self, addr, cmd, val):
        """Perform a smbus process call by writing a word (2 byte) value to
        the specified register of the device, and then reading a word of response
        data (which is returned).
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        data = create_string_buffer(struct.pack('=BH', cmd, val))
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))),          # Write data.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))  # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        # Note the python-smbus code appears to have a rather serious bug and
        # does not return the result value!  This is fixed below by returning it.
        return result.value
syscmd.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
jar_extract.py 文件源码 项目:pbtk 作者: marin-m 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def parse_default(field, ftype, fdefault):
    if not (ftype == 'bool' and fdefault == 'true'):
        try:
            fdefault = literal_eval(fdefault.rstrip('LDF'))
        except (ValueError, SyntaxError):
            fdefault = None

    if type(fdefault) is int:
        if ftype[0] != 'u' and ftype[:5] != 'fixed':
            if fdefault >> 63:
                fdefault = c_long(fdefault).value
            elif fdefault >> 31 and ftype[-2:] != '64':
                fdefault = c_int(fdefault).value
        else:
            fdefault &= (1 << int(ftype[-2:])) - 1

        if ftype == 'float' and abs(fdefault) >> 23:
            fdefault = unpack('=f', pack('=i', fdefault))[0]
        elif ftype == 'double' and abs(fdefault) >> 52:
            fdefault = unpack('=d', pack('=q', fdefault))[0]

    if fdefault:
        field.default_value = str(fdefault)
wave.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _write_header(self, initlength):
        assert not self._headerwritten
        self._file.write('RIFF')
        if not self._nframes:
            self._nframes = initlength / (self._nchannels * self._sampwidth)
        self._datalength = self._nframes * self._nchannels * self._sampwidth
        self._form_length_pos = self._file.tell()
        self._file.write(struct.pack('<l4s4slhhllhh4s',
            36 + self._datalength, 'WAVE', 'fmt ', 16,
            WAVE_FORMAT_PCM, self._nchannels, self._framerate,
            self._nchannels * self._framerate * self._sampwidth,
            self._nchannels * self._sampwidth,
            self._sampwidth * 8, 'data'))
        self._data_length_pos = self._file.tell()
        self._file.write(struct.pack('<l', self._datalength))
        self._headerwritten = True
pjf_testcase_server.py 文件源码 项目:PyJFuzz 作者: mseclab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def send_testcase(json, ip, port):
        """
        Send a raw testcase
        """
        try:
            json = struct.pack("<I", len(json)) + json
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect((ip, int(port)))
                s.send(json)
                s.shutdown(socket.SHUT_RDWR)
                s.close()
                return True
            except socket.error:
                return False
        except socket.error as e:
            raise PJFSocketError(e.message if hasattr(e, "message") else str(e))
        except Exception as e:
            raise  PJFBaseException(e.message)
ipaddr.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def v4_int_to_packed(address):
    """The binary representation of this address.

    Args:
        address: An integer representation of an IPv4 IP address.

    Returns:
        The binary representation of this address.

    Raises:
        ValueError: If the integer is too large to be an IPv4 IP
          address.
    """
    if address > _BaseV4._ALL_ONES:
        raise ValueError('Address too large for IPv4')
    return Bytes(struct.pack('!I', address))
bmp.py 文件源码 项目:CTF 作者: calee0219 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _create_header(cls, width, height):
        """ Internal function used to create headers when changing them is necessary, e.g. when image dimensions change.
            This function creates both a BMP header and a V3 DIB header, with some values left as defaults (e.g. pixels/meter) """

        total_header_size = cls.bmp_header_len + 40 # V3 len = 40 bytes
        padding_size = width & 3 # Magic stuff
        bitmap_size = ((width * 3) + padding_size) * height
        file_size = total_header_size + bitmap_size

        # BMP header: Magic (2 bytes), file size, 2 ignored values, bitmap offset
        header = struct.pack('<2s I 2H I', "BM", file_size, 0, 0, total_header_size)

        # DIB V3 header: header size, px width, px height, num of color planes, bpp, compression method,
        # bitmap data size, horizontal resolution, vertical resolution, number of colors in palette, number of important colors used
        # Few of these matter, so there are a bunch of default/"magic" numbers here...
        header += struct.pack('I 2i H H I I 2i 2I', 40, width, height, 1, 24, 0, bitmap_size, 0x0B13, 0x0B13, 0, 0)

        return header
util.py 文件源码 项目:shellgen 作者: MarioVilas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_bytecode_from_dump(input):

    # Read the data.
    data = input.read()

    # If it's an hexadecimal dump, decode and return it.
    if _re_is_hexa.match(data):
        hexstr  = ''.join(_re_get_hexa.findall(data))
        hexdump = [ int(hexstr[i:i+2], 16) for i in xrange(0, len(hexstr), 2) ]
        return struct.pack('B' * len(hexdump), *hexdump)

    # If it's base64 encoded data, decode and return it.
    if _re_is_b64.match(data):
        return data.decode('base64')

    # Assume it's a raw binary dump and return it unchanged.
    return data

#-----------------------------------------------------------------------------#
syscmd.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
OSC3.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', int(secs), int(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0, 1)

    return binary

######
#
# OSCMessage decoding functions
#
######
OSC2.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', long(secs), long(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0L, 1L)

    return binary

######
#
# OSCMessage decoding functions
#
######
osdk.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def build_header(self):
        timestamp = utctimestamp()
        padding = str.encode('\0\0' * 14)

        data = pack(
            '<i2IQH14s',
            timestamp,
            self.metadata['incremental'],
            self.metadata['segment_size'],
            self.metadata['sectors'],
            len(self.metadata['bases']),
            padding
        )

        checksum = crc32(data)

        for i in self.metadata['bases']:
            data += i
            checksum = crc32(i, checksum)

        return data, checksum
osdk.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_body(self, f):
        checksum = 0

        for segment, meta in self.segments.items():
            data = pack(
                '<2IH2B20s',
                segment,
                meta['incremental'],
                meta['base'],
                meta['encryption'],
                meta['compression'],
                meta['sha1_hash']
            )

            f.write(data)
            checksum = crc32(data, checksum)

        """ Backfill the body_checksum """
        f.seek(24, 0)
        f.write(pack('<I', checksum))
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def close(self):
        """Close the _Stream object. No operation should be
           done on it afterwards.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode == "w" and self.comptype != "tar":
                self.buf += self.cmp.flush()

            if self.mode == "w" and self.buf:
                self.fileobj.write(self.buf)
                self.buf = b""
                if self.comptype == "gz":
                    self.fileobj.write(struct.pack("<L", self.crc))
                    self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
base.py 文件源码 项目:estreamer 作者: spohara79 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __pack__(self):
        fmt = self.endian
        value_list = []
        for field in self._field_names_:
            fmt_ = self._field_format_[field]
            val = getattr(self, field)
            if isinstance(fmt_, StructArray):
                value_list.extend([
                    getattr(struct_, field)
                    for struct_ in fmt_.structure_list
                    for field in struct_._field_names_
                ])
            elif isinstance(fmt_, basestring) and (fmt_.startswith('BBB') or fmt_.startswith('bbb')):
                value_list.extend([(val >> i & 0xFF) for i in [x for x in range(0, len(fmt_) * 8, 8)]])
            else:    
                try:
                    value_list.append(val.encode('ascii', 'ignore'))
                except AttributeError:
                    value_list.append(val)
            fmt += str(len(val)) + 's' if fmt_ == 'variable' else fmt_.get_struct() if isinstance(fmt_, StructArray) else fmt_
        try:
            return struct.pack(fmt, *value_list)
        except struct.error as exc:
            raise_from(PackError("Unable to pack structure"), exc)
test_13_SDDS.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_ttag_values_packet(self):
        pkt = sdds_pkt.sdds_packet()

        ttag_=0
        ttage_=0
        sddstime=Time()
        sddstime.set(ttag_,ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        pkt.set_time( ttag_, ttage_)
        self.assertEqual( pkt.header.ttag.tstamp.asString(), res )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
        pkt.set_SDDSTime( sddstime )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )

        ttag_= 4294967296
        ttage_= 8388608
        sddstime=Time()
        sddstime.set(ttag_,ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        pkt.set_time( ttag_, ttage_)
        self.assertEqual( pkt.header.ttag.tstamp.asString(), res )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
        pkt.set_SDDSTime( sddstime )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
test_13_SDDS.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ttag_info(self):
        msptr_=0
        msdelta_=0
        ttag_=0
        ttage_=0
        res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_)
        ttag_val = ttag_info()
        self.assertEqual( ttag_val.asString(), res )

        # test big endian format for number
        ttag_= 4294967296
        ttage_= 8388608
        msptr_=256
        msdelta_=256
        res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_)
        ttag_val.info.msptr.msptr=msptr_
        ttag_val.info.msptr.msdelta=msdelta_
        ttag_val.tstamp.ttag=ttag_
        ttag_val.tstamp.ttage=ttage_
        self.assertEqual( ttag_val.asString(), res )
bluefile.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def pack_ext_header(hdr, structured=0):
    """
    Packs the value of the given BLUE file hdr dictionary's
    'ext_header' key into the BLUE file extended header format and
    updates the value of 'ext_size'.  The value of 'ext_header' can be
    a list of (key, value) tuples or a dict.  Before writing this out
    to disk at the end of a BLUE file it must be padded out to a
    multiple of 512 bytes.  If the keywords given are already a string
    it is presumed they are already packed and the 'ext_size' field is
    updated but the string itself is left alone.

    If <structured> is true, any embedded Python dictionaries, lists
    or tuples will pack their structure into the keywords with
    them. See pack_keywords() for more info.
    """
    packed = pack_keywords(hdr['ext_header'], _rep_tran[hdr['head_rep']],
                           structured=structured)
    hdr['ext_header'] = packed
    hdr['ext_size'] = len(packed)
cyphermain.py 文件源码 项目:Cypher 作者: NullArray 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def encrypt_file(key, in_filename, out_filename=None, chunksize=64*1024):

    if not out_filename:
        out_filename = in_filename + '.crypt'

    iv = ''.join(chr(random.randint(0, 0xFF)) for i in range(16))
    encryptor = AES.new(key, AES.MODE_CBC, iv)
    filesize = os.path.getsize(in_filename)

    with open(in_filename, 'rb') as infile:
        with open(out_filename, 'wb') as outfile:
            outfile.write(struct.pack('<Q', filesize))
            outfile.write(iv)

            while True:
                chunk = infile.read(chunksize)
                if len(chunk) == 0:
                    break
                elif len(chunk) % 16 != 0:
                    chunk += ' ' * (16 - len(chunk) % 16)

                outfile.write(encryptor.encrypt(chunk))
socket_client.py 文件源码 项目:Picamera 作者: loadstarCN 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def Send_File_Client():
    sendSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sendSock.connect(ADDR)

    fhead=struct.pack('IdI',1,float(time.time()),os.stat(filename).st_size)
    print(fhead)
    sendSock.send(fhead)
    fp = open(filename,'rb')

    while 1:
        filedata = fp.read(BUFSIZE)
        if not filedata: 
            break
        sendSock.send(filedata)

    '''
    print u"?????????????...\n"

    fp.close()
    sendSock.close()
    print u"?????...\n" 

    '''
types.py 文件源码 项目:rain 作者: scizzorz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def to_rain(cls, val):
    if val is None:
      return cls.new(typi.null, 0, 0, cls.null)
    elif val is False:
      return cls.new(typi.bool, 0, 0, cls.null)
    elif val is True:
      return cls.new(typi.bool, 0, 1, cls.null)
    elif isinstance(val, int):
      return cls.new(typi.int, 0, val, cls.null)
    elif isinstance(val, float):
      raw = struct.pack('d', val)
      intrep = struct.unpack('Q', raw)[0]
      return cls.new(typi.float, 0, intrep, cls.null)
    elif isinstance(val, str):
      str_p = ct.create_string_buffer(val.encode('utf-8'))
      cls._saves_.append(str_p)
      return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null)

    raise Exception("Can't convert value {!r} to Rain".format(val))
bootloader_serial.py 文件源码 项目:tockloader 作者: helena-project 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _change_baud_rate (self, baud_rate):
        '''
        If the bootloader on the board supports it and if it succeeds, try to
        increase the baud rate to make everything faster.
        '''
        pkt = struct.pack('<BI', 0x01, baud_rate)
        success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, True, 0, self.RESPONSE_OK, show_errors=False)

        if success:
            # The bootloader is new enough to support this.
            # Increase the baud rate
            self.sp.baudrate = baud_rate
            # Now confirm that everything is working.
            pkt = struct.pack('<BI', 0x02, baud_rate)
            success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, False, 0, self.RESPONSE_OK, show_errors=False)

            if not success:
                # Something went wrong. Go back to old baud rate
                self.sp.baudrate = 115200
bootloader_serial.py 文件源码 项目:tockloader 作者: helena-project 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def read_range (self, address, length):
        # Can only read up to 4095 bytes at a time.
        MAX_READ = 4095
        read = bytes()
        this_length = 0
        remaining = length
        while remaining > 0:
            if remaining > MAX_READ:
                this_length = MAX_READ
                remaining -= MAX_READ
            else:
                this_length = remaining
                remaining = 0

            message = struct.pack('<IH', address, this_length)
            success, flash = self._issue_command(self.COMMAND_READ_RANGE, message, True, this_length, self.RESPONSE_READ_RANGE)

            if not success:
                raise TockLoaderException('Error: Could not read flash')
            else:
                read += flash

            address += this_length

        return read
bootloader_serial.py 文件源码 项目:tockloader 作者: helena-project 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_crc_internal_flash (self, address, length):
        '''
        Get the bootloader to compute a CRC.
        '''
        message = struct.pack('<II', address, length)
        success, crc = self._issue_command(self.COMMAND_CRC_INTERNAL_FLASH, message, True, 4, self.RESPONSE_CRC_INTERNAL_FLASH)

        # There is a bug in a version of the bootloader where the CRC returns 6
        # bytes and not just 4. Need to read just in case to grab those extra
        # bytes.
        self.sp.read(2)

        if not success:
            if crc[1] == self.RESPONSE_BADADDR:
                raise TockLoaderException('Error: RESPONSE_BADADDR: Invalid address for CRC (address: 0x{:X})'.format(address))
            elif crc[1] == self.RESPONSE_BADARGS:
                raise TockLoaderException('Error: RESPONSE_BADARGS: Invalid length for CRC check')
            else:
                raise TockLoaderException('Error: 0x{:X}'.format(crc[1]))

        return crc
pickle.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def save_int(self, obj, pack=struct.pack):
        if self.bin:
            # If the int is small enough to fit in a signed 4-byte 2's-comp
            # format, we can store it more efficiently than the general
            # case.
            # First one- and two-byte unsigned ints:
            if obj >= 0:
                if obj <= 0xff:
                    self.write(BININT1 + chr(obj))
                    return
                if obj <= 0xffff:
                    self.write("%c%c%c" % (BININT2, obj&0xff, obj>>8))
                    return
            # Next check for 4-byte signed ints:
            high_bits = obj >> 31  # note that Python shift sign-extends
            if high_bits == 0 or high_bits == -1:
                # All high bits are copies of bit 2**31, so the value
                # fits in a 4-byte signed int.
                self.write(BININT + pack("<i", obj))
                return
        # Text pickle, or int too big to fit in signed 4-byte format.
        self.write(INT + repr(obj) + '\n')
pickle.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_string(self, obj, pack=struct.pack):
            unicode = obj.isunicode()

            if self.bin:
                if unicode:
                    obj = obj.encode("utf-8")
                l = len(obj)
                if l < 256 and not unicode:
                    self.write(SHORT_BINSTRING + chr(l) + obj)
                else:
                    s = pack("<i", l)
                    if unicode:
                        self.write(BINUNICODE + s + obj)
                    else:
                        self.write(BINSTRING + s + obj)
            else:
                if unicode:
                    obj = obj.replace("\\", "\\u005c")
                    obj = obj.replace("\n", "\\u000a")
                    obj = obj.encode('raw-unicode-escape')
                    self.write(UNICODE + obj + '\n')
                else:
                    self.write(STRING + repr(obj) + '\n')
            self.memoize(obj)
models.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_netconn(self, seq, netconn):
        parts = netconn.split('|')
        new_conn = {}
        timestamp = convert_event_time(parts[0])
        try:
            new_conn['remote_ip'] = socket.inet_ntoa(struct.pack('>i', int(parts[1])))
        except:
            new_conn['remote_ip'] = '0.0.0.0'
        new_conn['remote_port'] = int(parts[2])
        new_conn['proto'] = protocols[int(parts[3])]
        new_conn['domain'] = parts[4]
        if parts[5] == 'true':
            new_conn['direction'] = 'Outbound'
        else:
            new_conn['direction'] = 'Inbound'

        return CbNetConnEvent(self.process_model, timestamp, seq, new_conn)
models.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_netconn(self, seq, netconn):
        new_conn = {}
        timestamp = convert_event_time(netconn.get("timestamp", None))
        direction = netconn.get("direction", "true")

        if direction == 'true':
            new_conn['direction'] = 'Outbound'
        else:
            new_conn['direction'] = 'Inbound'

        for ipfield in ('remote_ip', 'local_ip', 'proxy_ip'):
            try:
                new_conn[ipfield] = socket.inet_ntoa(struct.pack('>i', int(netconn.get(ipfield, 0))))
            except:
                new_conn[ipfield] = netconn.get(ipfield, '0.0.0.0')

        for portfield in ('remote_port', 'local_port', 'proxy_port'):
            new_conn[portfield] = int(netconn.get(portfield, 0))

        new_conn['proto'] = protocols.get(int(netconn.get('proto', 0)), "Unknown")
        new_conn['domain'] = netconn.get('domain', '')

        return CbNetConnEvent(self.process_model, timestamp, seq, new_conn, version=2)
core.py 文件源码 项目:barpyrus 作者: t-wissmann 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def spawn(self, lines, additional_args = [ '-p', ''], width = None):
        (mouse_x, mouse_y) = get_mouse_location()
        if not width:
            width = 100 # some default width
        width = max(width, 101) # width has to be 100 at least (rofi restriction)
        # first, compute the top left corner of the menu
        menu_x = min(max(mouse_x - width/2, self.x), self.x + self.panel_width - width)
        menu_y = self.y
        # then, specify these coordinates relative to the mouse cursor
        menu_x -= mouse_x
        menu_y -= mouse_y
        # compile rofi arguments
        cmd = ['rofi', '-dmenu', '-sep' , '\\0' ]
        cmd += ['-monitor', '-3' ] # position relative to mouse cursor
        cmd += ['-layout', '1' ] # specify top left corner of the menu
        cmd += ['-width', str(width) ]
        cmd += ['-xoffset', str(menu_x), '-yoffset', str(menu_y) ]
        cmd += self.rofi_args
        cmd += additional_args
        rofi = subprocess.Popen(cmd,stdout=subprocess.PIPE,stdin=subprocess.PIPE)
        for i in lines:
            rofi.stdin.write(i.encode('utf-8'))
            rofi.stdin.write(struct.pack('B', 0))
        rofi.stdin.close()
        rofi.wait()


问题


面经


文章

微信
公众号

扫码关注公众号