python类c_buffer()的实例源码

dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
    '''
    Encrypts data and returns byte string

    :param input_bytes: The data to be encrypted
    :type input_bytes: String or Bytes
    :param entropy: Extra entropy to add to the encryption process (optional)
    :type entropy: String or Bytes
    '''
    if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
        self.fatal('The inputs to dpapi must be bytes')
    buffer_in      = c_buffer(input_bytes, len(input_bytes))
    buffer_entropy = c_buffer(entropy, len(entropy))
    blob_in        = DATA_BLOB(len(input_bytes), buffer_in)
    blob_entropy   = DATA_BLOB(len(entropy), buffer_entropy)
    blob_out       = DATA_BLOB()

    if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy), 
        None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
        return get_data(blob_out)
    else:
        self.fatal('Failed to decrypt data')
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def dpapi_decrypt_data(self, encrypted_bytes, entropy = extra_entropy):
    '''
    Decrypts data and returns byte string

    :param encrypted_bytes: The encrypted data
    :type encrypted_bytes: Bytes
    :param entropy: Extra entropy to add to the encryption process (optional)
    :type entropy: String or Bytes
    '''
    if not isinstance(encrypted_bytes, bytes) or not isinstance(entropy, bytes):
        self.fatal('The inputs to dpapi must be bytes')
    buffer_in      = c_buffer(encrypted_bytes, len(encrypted_bytes))
    buffer_entropy = c_buffer(entropy, len(entropy))
    blob_in        = DATA_BLOB(len(encrypted_bytes), buffer_in)
    blob_entropy   = DATA_BLOB(len(entropy), buffer_entropy)
    blob_out       = DATA_BLOB()
    if CryptUnprotectData(byref(blob_in), None, byref(blob_entropy), None,
        None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
        return get_data(blob_out)
    else:
        self.fatal('Failed to decrypt data')
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
    '''
    Encrypts data and returns byte string

    :param input_bytes: The data to be encrypted
    :type input_bytes: String or Bytes
    :param entropy: Extra entropy to add to the encryption process (optional)
    :type entropy: String or Bytes
    '''
    if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
        self.fatal('The inputs to dpapi must be bytes')
    buffer_in      = c_buffer(input_bytes, len(input_bytes))
    buffer_entropy = c_buffer(entropy, len(entropy))
    blob_in        = DATA_BLOB(len(input_bytes), buffer_in)
    blob_entropy   = DATA_BLOB(len(entropy), buffer_entropy)
    blob_out       = DATA_BLOB()

    if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy), 
        None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
        return get_data(blob_out)
    else:
        self.fatal('Failed to decrypt data')
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
    '''
    Encrypts data and returns byte string

    :param input_bytes: The data to be encrypted
    :type input_bytes: String or Bytes
    :param entropy: Extra entropy to add to the encryption process (optional)
    :type entropy: String or Bytes
    '''
    if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
        self.fatal('The inputs to dpapi must be bytes')
    buffer_in      = c_buffer(input_bytes, len(input_bytes))
    buffer_entropy = c_buffer(entropy, len(entropy))
    blob_in        = DATA_BLOB(len(input_bytes), buffer_in)
    blob_entropy   = DATA_BLOB(len(entropy), buffer_entropy)
    blob_out       = DATA_BLOB()

    if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy), 
        None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
        return get_data(blob_out)
    else:
        self.fatal('Failed to decrypt data')
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dpapi_decrypt_data(self, encrypted_bytes, entropy = extra_entropy):
    '''
    Decrypts data and returns byte string

    :param encrypted_bytes: The encrypted data
    :type encrypted_bytes: Bytes
    :param entropy: Extra entropy to add to the encryption process (optional)
    :type entropy: String or Bytes
    '''
    if not isinstance(encrypted_bytes, bytes) or not isinstance(entropy, bytes):
        self.fatal('The inputs to dpapi must be bytes')
    buffer_in      = c_buffer(encrypted_bytes, len(encrypted_bytes))
    buffer_entropy = c_buffer(entropy, len(entropy))
    blob_in        = DATA_BLOB(len(encrypted_bytes), buffer_in)
    blob_entropy   = DATA_BLOB(len(entropy), buffer_entropy)
    blob_out       = DATA_BLOB()
    if CryptUnprotectData(byref(blob_in), None, byref(blob_entropy), None,
        None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
        return get_data(blob_out)
    else:
        self.fatal('Failed to decrypt data')
alpc.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, msg_or_size=0x1000, attributes=None):
        # Init the PORT_MESSAGE
        if isinstance(msg_or_size, (long, int)):
            self.port_message_buffer_size = msg_or_size
            self.port_message_raw_buffer = ctypes.c_buffer(msg_or_size)
            self.port_message = AlpcMessagePort.from_buffer(self.port_message_raw_buffer)
            self.port_message.set_datalen(0)
        elif isinstance(msg_or_size, AlpcMessagePort):
            self.port_message = msg_or_size
            self.port_message_raw_buffer = self.port_message.raw_buffer
            self.port_message_buffer_size = len(self.port_message_raw_buffer)
        else:
            raise NotImplementedError("Uneexpected type for <msg_or_size>: {0}".format(msg_or_size))

        # Init the MessageAttributes
        if attributes is None:
            # self.attributes = MessageAttribute.with_all_attributes()
            self.attributes = MessageAttribute.with_all_attributes() ## Testing
        else:
            self.attributes = attributes

    # PORT_MESSAGE wrappers
handle.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def enumerate_handles():
    size_needed = ULONG()
    size = 0x1000
    buffer = ctypes.c_buffer(size)

    try:
        winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed))
    except WindowsError as e:
        pass

    size = size_needed.value + 0x1000
    buffer = ctypes.c_buffer(size)
    winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed))
    x = SYSTEM_HANDLE_INFORMATION.from_buffer(buffer)
    class _GENERATED_SYSTEM_HANDLE_INFORMATION(ctypes.Structure):
        _fields_ = [
            ("HandleCount", ULONG),
            ("Handles", Handle * x.HandleCount),
        ]
    return list(_GENERATED_SYSTEM_HANDLE_INFORMATION.from_buffer_copy(buffer[:size_needed.value]).Handles)
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_mapped_filename(self, addr):
        """The filename mapped at address ``addr`` or ``None``

        :rtype: :class:`str` or ``None``
        """
        buffer_size = 0x1000
        buffer = ctypes.c_buffer(buffer_size)

        if  windows.current_process.bitness == 32 and self.bitness == 64:
             target_size = ctypes.c_buffer(buffer_size)
             try:
                windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size)
             except NtStatusException as e:
                if e.code not in  [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]:
                    raise
                return None
             remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString)
             mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process)
             return mapped_filename.str

        try:
                size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size)
        except winproxy.Kernel32Error as e:
            return None
        return buffer[:size]
kernobj.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def query_link(linkpath):
    utf16_len = len(linkpath) * 2
    obj_attr = OBJECT_ATTRIBUTES()
    obj_attr.Length = ctypes.sizeof(obj_attr)
    obj_attr.RootDirectory = 0
    obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, linkpath))
    obj_attr.Attributes = OBJ_CASE_INSENSITIVE
    obj_attr.SecurityDescriptor = 0
    obj_attr.SecurityQualityOfService = 0

    res = HANDLE()
    x = winproxy.NtOpenSymbolicLinkObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr)
    v = LSA_UNICODE_STRING(0x1000, 0x1000, ctypes.cast(ctypes.c_buffer(0x1000), ctypes.c_wchar_p))
    s = ULONG()
    winproxy.NtQuerySymbolicLinkObject(res, v, s)
    return v.Buffer
ftd2xx.py 文件源码 项目:crescendo 作者: AriaFallah 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def listDevices(flags=0):
    """Return a list of serial numbers(default), descriptions or
    locations (Windows only) of the connected FTDI devices depending on value
    of flags"""
    n = _ft.DWORD()
    call_ft(_ft.FT_ListDevices, c.byref(n), None, _ft.DWORD(LIST_NUMBER_ONLY))
    devcount = n.value
    if devcount:
        # since ctypes has no pointer arithmetic.
        bd = [c.c_buffer(MAX_DESCRIPTION_SIZE) for i in range(devcount)] +\
            [None]
        # array of pointers to those strings, initially all NULL
        ba = (c.c_char_p *(devcount + 1))()
        for i in range(devcount):
            ba[i] = c.cast(bd[i], c.c_char_p)
        call_ft(_ft.FT_ListDevices, ba, c.byref(n), _ft.DWORD(LIST_ALL|flags))
        return [res for res in ba[:devcount]]
    else:
        return None
ftd2xx.py 文件源码 项目:crescendo 作者: AriaFallah 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def eeRead(self):
        """Get the program information from the EEPROM"""
##        if self.devInfo['type'] == 4:
##            version = 1
##        elif self.devInfo['type'] == 5:
##            version = 2
##        else:
##            version = 0
        progdata = _ft.ft_program_data(
                      Signature1=0, Signature2=0xffffffff,
                      Version=2,
                      Manufacturer = c.cast(c.c_buffer(256), c.c_char_p),
                      ManufacturerId = c.cast(c.c_buffer(256), c.c_char_p),
                      Description = c.cast(c.c_buffer(256), c.c_char_p),
                      SerialNumber = c.cast(c.c_buffer(256), c.c_char_p))

        call_ft(_ft.FT_EE_Read, self.handle, c.byref(progdata))
        return progdata
inotify_simple.py 文件源码 项目:inotify_simple 作者: chrisjbillington 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_events(data):
    """Parse data read from an inotify file descriptor into list of
    :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This
    function can be used if you have decided to call ``os.read()`` on the
    inotify file descriptor yourself, instead of calling
    :func:`~inotify_simple.INotify.read`.

    Args:
        data (bytes): A bytestring as read from an inotify file descriptor
    Returns:
        list: list of :attr:`~inotify_simple.Event` namedtuples"""
    events = []
    offset = 0
    buffer_size = len(data)
    while offset < buffer_size:
        wd, mask, cookie, namesize = struct.unpack_from(_EVENT_STRUCT_FORMAT, data, offset)
        offset += _EVENT_STRUCT_SIZE
        name = _fsdecode(ctypes.c_buffer(data[offset:offset + namesize], namesize).value)
        offset += namesize
        events.append(Event(wd, mask, cookie, name))
    return events
inotify_simple.py 文件源码 项目:i3configger 作者: obestwalter 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_events(data):
    """Parse data read from an inotify file descriptor into list of
    :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This
    function can be used if you have decided to call ``os.read()`` on the
    inotify file descriptor yourself, instead of calling
    :func:`~inotify_simple.INotify.read`.

    Args:
        data (bytes): A bytestring as read from an inotify file descriptor
    Returns:
        list: list of :attr:`~inotify_simple.Event` namedtuples"""
    events = []
    offset = 0
    buffer_size = len(data)
    while offset < buffer_size:
        wd, mask, cookie, namesize = struct.unpack_from(_EVENT_STRUCT_FORMAT, data, offset)
        offset += _EVENT_STRUCT_SIZE
        name = _fsdecode(ctypes.c_buffer(data[offset:offset + namesize], namesize).value)
        offset += namesize
        events.append(Event(wd, mask, cookie, name))
    return events
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_data(blob_out):
    cbData = int(blob_out.cbData)
    pbData = blob_out.pbData
    buffer = c_buffer(cbData)
    memcpy(buffer, pbData, cbData)
    LocalFree(pbData);
    return buffer.raw
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_data(blob_out):
    cbData = int(blob_out.cbData)
    pbData = blob_out.pbData
    buffer = c_buffer(cbData)
    memcpy(buffer, pbData, cbData)
    LocalFree(pbData);
    return buffer.raw
dpapi.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_data(blob_out):
    cbData = int(blob_out.cbData)
    pbData = blob_out.pbData
    buffer = c_buffer(cbData)
    memcpy(buffer, pbData, cbData)
    LocalFree(pbData);
    return buffer.raw
alpc.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def from_buffer_size(cls, buffer_size):
        buffer = ctypes.c_buffer(buffer_size)
        return cls.from_buffer(buffer)
alpc.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def with_attributes(cls, attributes):
        """Create a new :class:`MessageAttribute` with ``attributes`` allocated

            :returns: :class:`MessageAttribute`
        """
        size = cls._get_required_buffer_size(attributes)
        buffer = ctypes.c_buffer(size)
        self = cls.from_buffer(buffer)
        self.raw_buffer = buffer
        res = gdef.DWORD()
        winproxy.AlpcInitializeMessageAttribute(attributes, self, len(self.raw_buffer), res)
        return self
system.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def computer_name(self):
        """The name of the computer

        :type: :class:`str`
        """
        size = DWORD(0x1000)
        buf = ctypes.c_buffer(size.value)
        winproxy.GetComputerNameA(buf, ctypes.byref(size))
        return buf[:size.value]
system.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def windir(self):
        buffer = ctypes.c_buffer(0x100)
        reslen = winproxy.GetWindowsDirectoryA(buffer)
        return buffer[:reslen]
handle.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_object_name(self):
        lh = self.local_handle
        size_needed = DWORD()
        yyy = ctypes.c_buffer(0x1000)
        winproxy.NtQueryObject(lh, ObjectNameInformation, ctypes.byref(yyy), ctypes.sizeof(yyy), ctypes.byref(size_needed))
        return WinUnicodeString.from_buffer_copy(yyy[:size_needed.value]).str
handle.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_object_type(self):
        lh = self.local_handle
        xxx = EPUBLIC_OBJECT_TYPE_INFORMATION()
        size_needed = DWORD()
        try:
            winproxy.NtQueryObject(lh, ObjectTypeInformation, ctypes.byref(xxx), ctypes.sizeof(xxx), ctypes.byref(size_needed))
        except WindowsError as e:
            if e.code != STATUS_INFO_LENGTH_MISMATCH:
                # print("ERROR WITH {0:x}".format(lh))
                raise
            size = size_needed.value
            buffer = ctypes.c_buffer(size)
            winproxy.NtQueryObject(lh, ObjectTypeInformation, buffer, size, ctypes.byref(size_needed))
            xxx = EPUBLIC_OBJECT_TYPE_INFORMATION.from_buffer_copy(buffer)
        return xxx.TypeName.str
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def name(self):
        """Name of the process

        :type: :class:`str`
        """
        buffer = ctypes.c_buffer(0x1024)
        rsize = winproxy.GetProcessImageFileNameA(self.handle, buffer)
        # GetProcessImageFileNameA returns the fullpath
        return buffer[:rsize].decode().split("\\")[-1]
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def token_user(self):
        buffer_size = self.get_required_information_size(TokenUser)
        buffer = ctypes.c_buffer(buffer_size)
        self.get_informations(TokenUser, buffer)
        return ctypes.cast(buffer, POINTER(TOKEN_USER))[0]
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _user_and_computer_name(self):
        tok_usr = self.token_user
        sid = tok_usr.User.Sid
        usernamesize = DWORD(0x1000)
        computernamesize = DWORD(0x1000)
        username = ctypes.c_buffer(usernamesize.value)
        computername = ctypes.c_buffer(computernamesize.value)
        peUse = SID_NAME_USE()
        winproxy.LookupAccountSidA(None, sid, username, usernamesize, computername, computernamesize, peUse)
        return username[:usernamesize.value], computername[:computernamesize.value]
wingui.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def name(self):
        size = 0x1024
        buffer = ctypes.c_buffer(size)

        res = windows.winproxy.GetWindowTextA(self.handle, buffer, size)
        return buffer[:res]
wingui.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def at_point(cls, point):
        handle = winproxy.WindowFromPoint(point)
        return cls(handle)


    # I don't understand the interest:
    # Either return "" or C:\Python27\python.exe
    #def module(self):
    #   size = 0x1024
    #   buffer = ctypes.c_buffer(size)
    #   res = windows.winproxy.GetWindowModuleFileNameA(self.handle, buffer, size)
    #   return buffer[:res]
volume.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_logical_drive_names():
    size = 0x100
    buffer = ctypes.c_buffer(size)
    rsize = winproxy.GetLogicalDriveStringsA(0x1000, buffer)
    return buffer[:rsize].rstrip("\x00").split("\x00")
volume.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def query_dos_device(name):
    size = 0x1000
    buffer = ctypes.c_buffer(size)
    rsize = winproxy.QueryDosDeviceA(name, buffer, size)
    return buffer[:rsize].rstrip("\x00").split("\x00")
kernobj.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def entries(self):
        """Todo: better name ?"""
        path = self.fullname
        utf16_len = len(path) * 2
        obj_attr = OBJECT_ATTRIBUTES()
        obj_attr.Length = ctypes.sizeof(obj_attr)
        obj_attr.RootDirectory = None
        obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, path))
        obj_attr.Attributes = OBJ_CASE_INSENSITIVE
        obj_attr.SecurityDescriptor = 0
        obj_attr.SecurityQualityOfService = 0

        res = HANDLE()
        x = winproxy.NtOpenDirectoryObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr)
        size = 0x1000
        buf = ctypes.c_buffer(size)
        rres = ULONG()
        ctx = ULONG()
        while True:
            try:
                winproxy.NtQueryDirectoryObject(res, buf, size, False, False, ctx, rres)
                break
            except windows.generated_def.ntstatus.NtStatusException as e:
                if e.code == STATUS_NO_MORE_ENTRIES:
                    return {}
                if e.code == STATUS_MORE_ENTRIES:
                    size *= 2
                    buf = ctypes.c_buffer(size)
                    continue
                raise

        t = OBJECT_DIRECTORY_INFORMATION.from_buffer(buf)
        t = POBJECT_DIRECTORY_INFORMATION(t)
        res = {}
        for v in t:
            if v.Name.Buffer is None:
                break
            x = KernelObject(path, v.Name.Buffer, v.TypeName.Buffer)
            res[x.name] = x
        return res


问题


面经


文章

微信
公众号

扫码关注公众号