def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
'''
Encrypts data and returns byte string
:param input_bytes: The data to be encrypted
:type input_bytes: String or Bytes
:param entropy: Extra entropy to add to the encryption process (optional)
:type entropy: String or Bytes
'''
if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
self.fatal('The inputs to dpapi must be bytes')
buffer_in = c_buffer(input_bytes, len(input_bytes))
buffer_entropy = c_buffer(entropy, len(entropy))
blob_in = DATA_BLOB(len(input_bytes), buffer_in)
blob_entropy = DATA_BLOB(len(entropy), buffer_entropy)
blob_out = DATA_BLOB()
if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy),
None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
return get_data(blob_out)
else:
self.fatal('Failed to decrypt data')
python类c_buffer()的实例源码
def dpapi_decrypt_data(self, encrypted_bytes, entropy = extra_entropy):
'''
Decrypts data and returns byte string
:param encrypted_bytes: The encrypted data
:type encrypted_bytes: Bytes
:param entropy: Extra entropy to add to the encryption process (optional)
:type entropy: String or Bytes
'''
if not isinstance(encrypted_bytes, bytes) or not isinstance(entropy, bytes):
self.fatal('The inputs to dpapi must be bytes')
buffer_in = c_buffer(encrypted_bytes, len(encrypted_bytes))
buffer_entropy = c_buffer(entropy, len(entropy))
blob_in = DATA_BLOB(len(encrypted_bytes), buffer_in)
blob_entropy = DATA_BLOB(len(entropy), buffer_entropy)
blob_out = DATA_BLOB()
if CryptUnprotectData(byref(blob_in), None, byref(blob_entropy), None,
None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
return get_data(blob_out)
else:
self.fatal('Failed to decrypt data')
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
'''
Encrypts data and returns byte string
:param input_bytes: The data to be encrypted
:type input_bytes: String or Bytes
:param entropy: Extra entropy to add to the encryption process (optional)
:type entropy: String or Bytes
'''
if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
self.fatal('The inputs to dpapi must be bytes')
buffer_in = c_buffer(input_bytes, len(input_bytes))
buffer_entropy = c_buffer(entropy, len(entropy))
blob_in = DATA_BLOB(len(input_bytes), buffer_in)
blob_entropy = DATA_BLOB(len(entropy), buffer_entropy)
blob_out = DATA_BLOB()
if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy),
None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
return get_data(blob_out)
else:
self.fatal('Failed to decrypt data')
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy):
'''
Encrypts data and returns byte string
:param input_bytes: The data to be encrypted
:type input_bytes: String or Bytes
:param entropy: Extra entropy to add to the encryption process (optional)
:type entropy: String or Bytes
'''
if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes):
self.fatal('The inputs to dpapi must be bytes')
buffer_in = c_buffer(input_bytes, len(input_bytes))
buffer_entropy = c_buffer(entropy, len(entropy))
blob_in = DATA_BLOB(len(input_bytes), buffer_in)
blob_entropy = DATA_BLOB(len(entropy), buffer_entropy)
blob_out = DATA_BLOB()
if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy),
None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
return get_data(blob_out)
else:
self.fatal('Failed to decrypt data')
def dpapi_decrypt_data(self, encrypted_bytes, entropy = extra_entropy):
'''
Decrypts data and returns byte string
:param encrypted_bytes: The encrypted data
:type encrypted_bytes: Bytes
:param entropy: Extra entropy to add to the encryption process (optional)
:type entropy: String or Bytes
'''
if not isinstance(encrypted_bytes, bytes) or not isinstance(entropy, bytes):
self.fatal('The inputs to dpapi must be bytes')
buffer_in = c_buffer(encrypted_bytes, len(encrypted_bytes))
buffer_entropy = c_buffer(entropy, len(entropy))
blob_in = DATA_BLOB(len(encrypted_bytes), buffer_in)
blob_entropy = DATA_BLOB(len(entropy), buffer_entropy)
blob_out = DATA_BLOB()
if CryptUnprotectData(byref(blob_in), None, byref(blob_entropy), None,
None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)):
return get_data(blob_out)
else:
self.fatal('Failed to decrypt data')
def __init__(self, msg_or_size=0x1000, attributes=None):
# Init the PORT_MESSAGE
if isinstance(msg_or_size, (long, int)):
self.port_message_buffer_size = msg_or_size
self.port_message_raw_buffer = ctypes.c_buffer(msg_or_size)
self.port_message = AlpcMessagePort.from_buffer(self.port_message_raw_buffer)
self.port_message.set_datalen(0)
elif isinstance(msg_or_size, AlpcMessagePort):
self.port_message = msg_or_size
self.port_message_raw_buffer = self.port_message.raw_buffer
self.port_message_buffer_size = len(self.port_message_raw_buffer)
else:
raise NotImplementedError("Uneexpected type for <msg_or_size>: {0}".format(msg_or_size))
# Init the MessageAttributes
if attributes is None:
# self.attributes = MessageAttribute.with_all_attributes()
self.attributes = MessageAttribute.with_all_attributes() ## Testing
else:
self.attributes = attributes
# PORT_MESSAGE wrappers
def enumerate_handles():
size_needed = ULONG()
size = 0x1000
buffer = ctypes.c_buffer(size)
try:
winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed))
except WindowsError as e:
pass
size = size_needed.value + 0x1000
buffer = ctypes.c_buffer(size)
winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed))
x = SYSTEM_HANDLE_INFORMATION.from_buffer(buffer)
class _GENERATED_SYSTEM_HANDLE_INFORMATION(ctypes.Structure):
_fields_ = [
("HandleCount", ULONG),
("Handles", Handle * x.HandleCount),
]
return list(_GENERATED_SYSTEM_HANDLE_INFORMATION.from_buffer_copy(buffer[:size_needed.value]).Handles)
def get_mapped_filename(self, addr):
"""The filename mapped at address ``addr`` or ``None``
:rtype: :class:`str` or ``None``
"""
buffer_size = 0x1000
buffer = ctypes.c_buffer(buffer_size)
if windows.current_process.bitness == 32 and self.bitness == 64:
target_size = ctypes.c_buffer(buffer_size)
try:
windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size)
except NtStatusException as e:
if e.code not in [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]:
raise
return None
remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString)
mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process)
return mapped_filename.str
try:
size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size)
except winproxy.Kernel32Error as e:
return None
return buffer[:size]
def query_link(linkpath):
utf16_len = len(linkpath) * 2
obj_attr = OBJECT_ATTRIBUTES()
obj_attr.Length = ctypes.sizeof(obj_attr)
obj_attr.RootDirectory = 0
obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, linkpath))
obj_attr.Attributes = OBJ_CASE_INSENSITIVE
obj_attr.SecurityDescriptor = 0
obj_attr.SecurityQualityOfService = 0
res = HANDLE()
x = winproxy.NtOpenSymbolicLinkObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr)
v = LSA_UNICODE_STRING(0x1000, 0x1000, ctypes.cast(ctypes.c_buffer(0x1000), ctypes.c_wchar_p))
s = ULONG()
winproxy.NtQuerySymbolicLinkObject(res, v, s)
return v.Buffer
def listDevices(flags=0):
"""Return a list of serial numbers(default), descriptions or
locations (Windows only) of the connected FTDI devices depending on value
of flags"""
n = _ft.DWORD()
call_ft(_ft.FT_ListDevices, c.byref(n), None, _ft.DWORD(LIST_NUMBER_ONLY))
devcount = n.value
if devcount:
# since ctypes has no pointer arithmetic.
bd = [c.c_buffer(MAX_DESCRIPTION_SIZE) for i in range(devcount)] +\
[None]
# array of pointers to those strings, initially all NULL
ba = (c.c_char_p *(devcount + 1))()
for i in range(devcount):
ba[i] = c.cast(bd[i], c.c_char_p)
call_ft(_ft.FT_ListDevices, ba, c.byref(n), _ft.DWORD(LIST_ALL|flags))
return [res for res in ba[:devcount]]
else:
return None
def eeRead(self):
"""Get the program information from the EEPROM"""
## if self.devInfo['type'] == 4:
## version = 1
## elif self.devInfo['type'] == 5:
## version = 2
## else:
## version = 0
progdata = _ft.ft_program_data(
Signature1=0, Signature2=0xffffffff,
Version=2,
Manufacturer = c.cast(c.c_buffer(256), c.c_char_p),
ManufacturerId = c.cast(c.c_buffer(256), c.c_char_p),
Description = c.cast(c.c_buffer(256), c.c_char_p),
SerialNumber = c.cast(c.c_buffer(256), c.c_char_p))
call_ft(_ft.FT_EE_Read, self.handle, c.byref(progdata))
return progdata
def parse_events(data):
"""Parse data read from an inotify file descriptor into list of
:attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This
function can be used if you have decided to call ``os.read()`` on the
inotify file descriptor yourself, instead of calling
:func:`~inotify_simple.INotify.read`.
Args:
data (bytes): A bytestring as read from an inotify file descriptor
Returns:
list: list of :attr:`~inotify_simple.Event` namedtuples"""
events = []
offset = 0
buffer_size = len(data)
while offset < buffer_size:
wd, mask, cookie, namesize = struct.unpack_from(_EVENT_STRUCT_FORMAT, data, offset)
offset += _EVENT_STRUCT_SIZE
name = _fsdecode(ctypes.c_buffer(data[offset:offset + namesize], namesize).value)
offset += namesize
events.append(Event(wd, mask, cookie, name))
return events
def parse_events(data):
"""Parse data read from an inotify file descriptor into list of
:attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This
function can be used if you have decided to call ``os.read()`` on the
inotify file descriptor yourself, instead of calling
:func:`~inotify_simple.INotify.read`.
Args:
data (bytes): A bytestring as read from an inotify file descriptor
Returns:
list: list of :attr:`~inotify_simple.Event` namedtuples"""
events = []
offset = 0
buffer_size = len(data)
while offset < buffer_size:
wd, mask, cookie, namesize = struct.unpack_from(_EVENT_STRUCT_FORMAT, data, offset)
offset += _EVENT_STRUCT_SIZE
name = _fsdecode(ctypes.c_buffer(data[offset:offset + namesize], namesize).value)
offset += namesize
events.append(Event(wd, mask, cookie, name))
return events
def get_data(blob_out):
cbData = int(blob_out.cbData)
pbData = blob_out.pbData
buffer = c_buffer(cbData)
memcpy(buffer, pbData, cbData)
LocalFree(pbData);
return buffer.raw
def get_data(blob_out):
cbData = int(blob_out.cbData)
pbData = blob_out.pbData
buffer = c_buffer(cbData)
memcpy(buffer, pbData, cbData)
LocalFree(pbData);
return buffer.raw
def get_data(blob_out):
cbData = int(blob_out.cbData)
pbData = blob_out.pbData
buffer = c_buffer(cbData)
memcpy(buffer, pbData, cbData)
LocalFree(pbData);
return buffer.raw
def from_buffer_size(cls, buffer_size):
buffer = ctypes.c_buffer(buffer_size)
return cls.from_buffer(buffer)
def with_attributes(cls, attributes):
"""Create a new :class:`MessageAttribute` with ``attributes`` allocated
:returns: :class:`MessageAttribute`
"""
size = cls._get_required_buffer_size(attributes)
buffer = ctypes.c_buffer(size)
self = cls.from_buffer(buffer)
self.raw_buffer = buffer
res = gdef.DWORD()
winproxy.AlpcInitializeMessageAttribute(attributes, self, len(self.raw_buffer), res)
return self
def computer_name(self):
"""The name of the computer
:type: :class:`str`
"""
size = DWORD(0x1000)
buf = ctypes.c_buffer(size.value)
winproxy.GetComputerNameA(buf, ctypes.byref(size))
return buf[:size.value]
def windir(self):
buffer = ctypes.c_buffer(0x100)
reslen = winproxy.GetWindowsDirectoryA(buffer)
return buffer[:reslen]
def _get_object_name(self):
lh = self.local_handle
size_needed = DWORD()
yyy = ctypes.c_buffer(0x1000)
winproxy.NtQueryObject(lh, ObjectNameInformation, ctypes.byref(yyy), ctypes.sizeof(yyy), ctypes.byref(size_needed))
return WinUnicodeString.from_buffer_copy(yyy[:size_needed.value]).str
def _get_object_type(self):
lh = self.local_handle
xxx = EPUBLIC_OBJECT_TYPE_INFORMATION()
size_needed = DWORD()
try:
winproxy.NtQueryObject(lh, ObjectTypeInformation, ctypes.byref(xxx), ctypes.sizeof(xxx), ctypes.byref(size_needed))
except WindowsError as e:
if e.code != STATUS_INFO_LENGTH_MISMATCH:
# print("ERROR WITH {0:x}".format(lh))
raise
size = size_needed.value
buffer = ctypes.c_buffer(size)
winproxy.NtQueryObject(lh, ObjectTypeInformation, buffer, size, ctypes.byref(size_needed))
xxx = EPUBLIC_OBJECT_TYPE_INFORMATION.from_buffer_copy(buffer)
return xxx.TypeName.str
def name(self):
"""Name of the process
:type: :class:`str`
"""
buffer = ctypes.c_buffer(0x1024)
rsize = winproxy.GetProcessImageFileNameA(self.handle, buffer)
# GetProcessImageFileNameA returns the fullpath
return buffer[:rsize].decode().split("\\")[-1]
def token_user(self):
buffer_size = self.get_required_information_size(TokenUser)
buffer = ctypes.c_buffer(buffer_size)
self.get_informations(TokenUser, buffer)
return ctypes.cast(buffer, POINTER(TOKEN_USER))[0]
def _user_and_computer_name(self):
tok_usr = self.token_user
sid = tok_usr.User.Sid
usernamesize = DWORD(0x1000)
computernamesize = DWORD(0x1000)
username = ctypes.c_buffer(usernamesize.value)
computername = ctypes.c_buffer(computernamesize.value)
peUse = SID_NAME_USE()
winproxy.LookupAccountSidA(None, sid, username, usernamesize, computername, computernamesize, peUse)
return username[:usernamesize.value], computername[:computernamesize.value]
def name(self):
size = 0x1024
buffer = ctypes.c_buffer(size)
res = windows.winproxy.GetWindowTextA(self.handle, buffer, size)
return buffer[:res]
def at_point(cls, point):
handle = winproxy.WindowFromPoint(point)
return cls(handle)
# I don't understand the interest:
# Either return "" or C:\Python27\python.exe
#def module(self):
# size = 0x1024
# buffer = ctypes.c_buffer(size)
# res = windows.winproxy.GetWindowModuleFileNameA(self.handle, buffer, size)
# return buffer[:res]
def get_logical_drive_names():
size = 0x100
buffer = ctypes.c_buffer(size)
rsize = winproxy.GetLogicalDriveStringsA(0x1000, buffer)
return buffer[:rsize].rstrip("\x00").split("\x00")
def query_dos_device(name):
size = 0x1000
buffer = ctypes.c_buffer(size)
rsize = winproxy.QueryDosDeviceA(name, buffer, size)
return buffer[:rsize].rstrip("\x00").split("\x00")
def entries(self):
"""Todo: better name ?"""
path = self.fullname
utf16_len = len(path) * 2
obj_attr = OBJECT_ATTRIBUTES()
obj_attr.Length = ctypes.sizeof(obj_attr)
obj_attr.RootDirectory = None
obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, path))
obj_attr.Attributes = OBJ_CASE_INSENSITIVE
obj_attr.SecurityDescriptor = 0
obj_attr.SecurityQualityOfService = 0
res = HANDLE()
x = winproxy.NtOpenDirectoryObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr)
size = 0x1000
buf = ctypes.c_buffer(size)
rres = ULONG()
ctx = ULONG()
while True:
try:
winproxy.NtQueryDirectoryObject(res, buf, size, False, False, ctx, rres)
break
except windows.generated_def.ntstatus.NtStatusException as e:
if e.code == STATUS_NO_MORE_ENTRIES:
return {}
if e.code == STATUS_MORE_ENTRIES:
size *= 2
buf = ctypes.c_buffer(size)
continue
raise
t = OBJECT_DIRECTORY_INFORMATION.from_buffer(buf)
t = POBJECT_DIRECTORY_INFORMATION(t)
res = {}
for v in t:
if v.Name.Buffer is None:
break
x = KernelObject(path, v.Name.Buffer, v.TypeName.Buffer)
res[x.name] = x
return res