def teb_base(self):
"""The address of the thread's TEB
:type: :class:`int`
"""
if windows.current_process.bitness == 32 and self.owner.bitness == 64:
restype = rctypes.transform_type_to_remote64bits(THREAD_BASIC_INFORMATION)
ressize = (ctypes.sizeof(restype))
# Manual aligned allocation :DDDD
nb_qword = (ressize + 8) / ctypes.sizeof(ULONGLONG)
buffer = (nb_qword * ULONGLONG)()
struct_address = ctypes.addressof(buffer)
if (struct_address & 0xf) not in [0, 8]:
raise ValueError("ULONGLONG array not aligned on 8")
windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadBasicInformation, struct_address, ressize)
return restype(struct_address, windows.current_process).TebBaseAddress
res = THREAD_BASIC_INFORMATION()
windows.winproxy.NtQueryInformationThread(self.handle, ThreadBasicInformation, byref(res), ctypes.sizeof(res))
return res.TebBaseAddress
python类sizeof()的实例源码
def list(self):
"""
return a list of <PROCESSENTRY32>
"""
processes = []
hProcessSnap = CreateToolhelp32Snapshot(TH32CS_CLASS.SNAPPROCESS, 0)
pe32 = PROCESSENTRY32()
pe32.dwSize = sizeof(PROCESSENTRY32)
ret = Process32First(hProcessSnap, pointer(pe32))
while ret:
ret = Process32Next(hProcessSnap, pointer(pe32))
if pe32.dwFlags == 0:
processes.append(copy.copy(pe32))
else:
break
CloseHandle(hProcessSnap)
return processes
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def inet_pton(address_family, ip_string):
if address_family == socket.AF_INET:
return socket.inet_aton(ip_string)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def append(self, obj):
"append to the list; the object must be assignable to the ctype"
size = self.size
if size >= self.prealloc_size:
# Need to make new space. There's no 'realloc' for
# ctypes so this isn't as nice as it is in C.
# I'm using Python's growth pattern, which is
# 0, 4, 8, 16, 25, 35, 46, 58, 72, 88, ...
if size < 9:
newsize = (size>>3) + 3 + size
else:
newsize = (size>>3) + 6 + size
newdata = (self.c_type * newsize)()
ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data))
self.data = newdata
self.prealloc_size = newsize
self.data[size] = obj
self.size = size+1
def append_kwargs(self, **kwargs):
"append to the list; assign each key/value to the new item"
size = self.size
if size >= self.prealloc_size:
if size < 9:
newsize = (size>>3) + 3 + size
else:
newsize = (size>>3) + 6 + size
newdata = (self.c_type * newsize)()
ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data))
self.data = newdata
self.prealloc_size = newsize
obj = self.data[size]
for k, v in kwargs.iteritems():
setattr(obj, k, v)
self.size = size+1
def wait_until_idle(idle_time=60):
"""Wait until no more user activity is detected.
This function won't return until `idle_time` seconds have elapsed
since the last user activity was detected.
"""
idle_time_ms = int(idle_time*1000)
liinfo = LASTINPUTINFO()
liinfo.cbSize = ctypes.sizeof(liinfo)
while True:
GetLastInputInfo(ctypes.byref(liinfo))
elapsed = GetTickCount() - liinfo.dwTime
if elapsed>=idle_time_ms:
break
Sleep(idle_time_ms - elapsed or 1)
def wait_until_active(tol=5):
"""Wait until awakened by user activity.
This function will block and wait until some user activity
is detected. Because of the polling method used, it may return
`tol` seconds (or less) after user activity actually began.
"""
liinfo = LASTINPUTINFO()
liinfo.cbSize = ctypes.sizeof(liinfo)
lasttime = None
delay = 1 # ms
maxdelay = int(tol*1000)
while True:
GetLastInputInfo(ctypes.byref(liinfo))
if lasttime is None: lasttime = liinfo.dwTime
if lasttime != liinfo.dwTime:
break
delay = min(2*delay, maxdelay)
Sleep(delay)
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype):
""" quicker implementation of pmExtractValue than the default provided with the pcp python bindings
this version saves converting the C indexes to python and back again
"""
inst = c_int()
outAtom = pmapi.pmAtomValue()
status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype)
if status < 0:
raise pmapi.pmErr(status)
if dtype == c_api.PM_TYPE_STRING:
# Get pointer to C string
c_str = c_char_p()
memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p))
# Convert to a python string and have result point to it
outAtom.cp = outAtom.cp
# Free the C string
LIBC.free(c_str)
return outAtom.dref(dtype), inst.value
def oni_call(func):
@functools.wraps(func)
def wrapper(*args):
res = func(*args)
if res != OniStatus.ONI_STATUS_OK:
msg = oniGetExtendedError()
if not msg:
msg = ''
buf = ctypes.create_string_buffer(1024)
rc = _oniGetLogFileName(buf, ctypes.sizeof(buf))
if rc == OniStatus.ONI_STATUS_OK:
logfile = buf.value
else:
logfile = None
raise OpenNIError(res, msg.strip(), logfile)
return res
return wrapper
def new_aligned(cls):
"""Return a new :class:`ECONTEXT64` aligned on 16 bits
temporary workaround or horrible hack ? choose your side
"""
size = ctypes.sizeof(cls)
nb_qword = (size + 8) / ctypes.sizeof(ULONGLONG)
buffer = (nb_qword * ULONGLONG)()
struct_address = ctypes.addressof(buffer)
if (struct_address & 0xf) not in [0, 8]:
raise ValueError("ULONGLONG array not aligned on 8")
if (struct_address & 0xf) == 8:
struct_address += 8
self = cls.from_address(struct_address)
# Keep the raw buffer alive
self._buffer = buffer
return self
def start_address(self):
"""The start address of the thread
:type: :class:`int`
"""
if windows.current_process.bitness == 32 and self.owner.bitness == 64:
res = ULONGLONG()
windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadQuerySetWin32StartAddress, byref(res), ctypes.sizeof(res))
return res.value
res_size = max(self.owner.bitness, windows.current_process.bitness)
if res_size == 32:
res = ULONG()
else:
res = ULONGLONG()
winproxy.NtQueryInformationThread(self.handle, ThreadQuerySetWin32StartAddress, byref(res), ctypes.sizeof(res))
return res.value
def ppid(self):
"""Parent Process ID
:type: :class:`int`
"""
if windows.current_process.bitness == 32 and self.bitness == 64:
xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
# Fuck-it <3
data = (ctypes.c_char * ctypes.sizeof(xtype))()
windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype))
# Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data'
x = xtype(ctypes.addressof(data), windows.current_process)
else:
information_type = 0
x = PROCESS_BASIC_INFORMATION()
winproxy.NtQueryInformationProcess(self.handle, information_type, x)
return x.InheritedFromUniqueProcessId
def query_memory(self, addr):
"""Query the memory informations about page at ``addr``
:rtype: :class:`~windows.generated_def.MEMORY_BASIC_INFORMATION`
"""
if windows.current_process.bitness == 32 and self.bitness == 64:
res = MEMORY_BASIC_INFORMATION64()
try:
v = windows.syswow64.NtQueryVirtualMemory_32_to_64(ProcessHandle=self.handle, BaseAddress=addr, MemoryInformationClass=MemoryBasicInformation, MemoryInformation=res)
except NtStatusException as e:
if e.code & 0xffffffff == 0XC000000D:
raise winproxy.Kernel32Error("NtQueryVirtualMemory_32_to_64")
raise
return res
info_type = {32 : MEMORY_BASIC_INFORMATION32, 64 : MEMORY_BASIC_INFORMATION64}
res = info_type[windows.current_process.bitness]()
ptr = ctypes.cast(byref(res), POINTER(MEMORY_BASIC_INFORMATION))
winproxy.VirtualQueryEx(self.handle, addr, ptr, sizeof(res))
return res
def peb_syswow_addr(self):
if not self.is_wow_64:
raise ValueError("Not a syswow process")
if windows.current_process.bitness == 64:
information_type = 0
x = PROCESS_BASIC_INFORMATION()
winproxy.NtQueryInformationProcess(self.handle, information_type, x)
peb_addr = ctypes.cast(x.PebBaseAddress, PVOID).value
return peb_addr
else: #current is 32bits
x = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
# Fuck-it <3
data = (ctypes.c_char * ctypes.sizeof(x))()
windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(x))
peb_offset = x.PebBaseAddress.offset
peb_addr = struct.unpack("<Q", data[x.PebBaseAddress.offset: x.PebBaseAddress.offset+8])[0]
return peb_addr
# Not a fixedpropety to prevent ref-cycle and uncollectable WinProcess
# Try with a weakref ?
def get_INT(self):
if not self.OriginalFirstThunk:
return None
int_addr = self.OriginalFirstThunk + self.baseaddr
int_entry = self.transformers.create_structure_at(THUNK_DATA, int_addr)
res = []
while int_entry.Ordinal:
if int_entry.Ordinal & self.IMAGE_ORDINAL_FLAG:
res += [(int_entry.Ordinal & 0x7fffffff, None)]
else:
import_by_name = self.transformers.create_structure_at(IMPORT_BY_NAME, self.baseaddr + int_entry.AddressOfData)
name_address = self.baseaddr + int_entry.AddressOfData + type(import_by_name).Name.offset
if self.target is None:
name = get_string(self.target, name_address)
else:
name = get_string(self.target, name_address).decode()
res.append((import_by_name.Hint, name))
int_addr += ctypes.sizeof(type(int_entry))
int_entry = self.transformers.create_structure_at(THUNK_DATA, int_addr)
return res
def get_raw_certificate_chains(self): # Rename to all_chains ?
chain_context = EPCCERT_CHAIN_CONTEXT()
enhkey_usage = gdef.CERT_ENHKEY_USAGE()
enhkey_usage.cUsageIdentifier = 0
enhkey_usage.rgpszUsageIdentifier = None
cert_usage = gdef.CERT_USAGE_MATCH()
cert_usage.dwType = gdef.USAGE_MATCH_TYPE_AND
cert_usage.Usage = enhkey_usage
chain_para = gdef.CERT_CHAIN_PARA()
chain_para.cbSize = ctypes.sizeof(chain_para)
chain_para.RequestedUsage = cert_usage
winproxy.CertGetCertificateChain(None, self, None, self[0].hCertStore, ctypes.byref(chain_para), 0, None, ctypes.byref(chain_context))
#return CertficateChain(chain_context)
return chain_context
def enable_privilege(lpszPrivilege, bEnablePrivilege):
"""
Enable or disable a privilege::
enable_privilege(SE_DEBUG_NAME, True)
"""
tp = TOKEN_PRIVILEGES()
luid = LUID()
hToken = HANDLE()
winproxy.OpenProcessToken(winproxy.GetCurrentProcess(), TOKEN_ALL_ACCESS, byref(hToken))
winproxy.LookupPrivilegeValueA(None, lpszPrivilege, byref(luid))
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
if bEnablePrivilege:
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
else:
tp.Privileges[0].Attributes = 0
winproxy.AdjustTokenPrivileges(hToken, False, byref(tp), sizeof(TOKEN_PRIVILEGES))
winproxy.CloseHandle(hToken)
if winproxy.GetLastError() == windef.ERROR_NOT_ALL_ASSIGNED:
raise ValueError("Failed to get privilege {0}".format(lpszPrivilege))
return True
def _ram(self):
kernel32 = ctypes.windll.kernel32
c_ulong = ctypes.c_ulong
class MEMORYSTATUS(ctypes.Structure):
_fields_ = [
('dwLength', c_ulong),
('dwMemoryLoad', c_ulong),
('dwTotalPhys', c_ulong),
('dwAvailPhys', c_ulong),
('dwTotalPageFile', c_ulong),
('dwAvailPageFile', c_ulong),
('dwTotalVirtual', c_ulong),
('dwAvailVirtual', c_ulong)
]
memoryStatus = MEMORYSTATUS()
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUS)
kernel32.GlobalMemoryStatus(ctypes.byref(memoryStatus))
return (memoryStatus.dwTotalPhys, memoryStatus.dwAvailPhys)
def write_bytes(self, address, data):
address = int(address)
if not self.isProcessOpen:
raise ProcessException("Can't write_bytes(%s, %s), process %s is not open" % (address, data, self.pid))
buffer = create_string_buffer(data)
sizeWriten = c_ulong(0)
bufferSize = sizeof(buffer) - 1
_address = address
_length = bufferSize + 1
try:
old_protect = self.VirtualProtectEx(_address, _length, PAGE_EXECUTE_READWRITE)
except:
pass
res = windll.kernel32.WriteProcessMemory(self.h_process, address, buffer, bufferSize, byref(sizeWriten))
try:
self.VirtualProtectEx(_address, _length, old_protect)
except:
pass
return res
def list_modules(self):
"""
return a list of <MODULEENTRY32>
"""
module_list = []
if self.process32 is not None:
hModuleSnap = CreateToolhelp32Snapshot(TH32CS_CLASS.SNAPMODULE, self.process32.th32ProcessID)
if hModuleSnap is not None:
module_entry = MODULEENTRY32()
module_entry.dwSize = sizeof(module_entry)
success = Module32First(hModuleSnap, byref(module_entry))
while success:
if module_entry.th32ProcessID == self.process32.th32ProcessID:
module_list.append(copy.copy(module_entry))
success = Module32Next(hModuleSnap, byref(module_entry))
CloseHandle(hModuleSnap)
return module_list
def test_bitfield(self):
# struct bitfield { int a:10, b:20, c:3; };
assert ffi.sizeof("struct bitfield") == 8
s = ffi.new("struct bitfield *")
s.a = 511
py.test.raises(OverflowError, "s.a = 512")
py.test.raises(OverflowError, "s[0].a = 512")
assert s.a == 511
s.a = -512
py.test.raises(OverflowError, "s.a = -513")
py.test.raises(OverflowError, "s[0].a = -513")
assert s.a == -512
s.c = 3
assert s.c == 3
py.test.raises(OverflowError, "s.c = 4")
py.test.raises(OverflowError, "s[0].c = 4")
s.c = -4
assert s.c == -4
def test_ffi_buffer_with_file(self):
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_nested_anonymous_struct(self):
# struct nested_anon {
# struct { int a, b; };
# union { int c, d; };
# };
assert ffi.sizeof("struct nested_anon") == 3 * SIZE_OF_INT
p = ffi.new("struct nested_anon *", [1, 2, 3])
assert p.a == 1
assert p.b == 2
assert p.c == 3
assert p.d == 3
p.d = 17
assert p.c == 17
p.b = 19
assert p.a == 1
assert p.b == 19
assert p.c == 17
assert p.d == 17
p = ffi.new("struct nested_anon *", {'b': 12, 'd': 14})
assert p.a == 0
assert p.b == 12
assert p.c == 14
assert p.d == 14
def test_struct_packed(self):
# struct nonpacked { char a; int b; };
# struct is_packed { char a; int b; } __attribute__((packed));
assert ffi.sizeof("struct nonpacked") == 8
assert ffi.sizeof("struct is_packed") == 5
assert ffi.alignof("struct nonpacked") == 4
assert ffi.alignof("struct is_packed") == 1
s = ffi.new("struct is_packed[2]")
s[0].b = 42623381
s[0].a = b'X'
s[1].b = -4892220
s[1].a = b'Y'
assert s[0].b == 42623381
assert s[0].a == b'X'
assert s[1].b == -4892220
assert s[1].a == b'Y'
def test_bitfield(self):
ffi = FFI(backend=self.Backend())
ffi.cdef("struct foo { int a:10, b:20, c:3; };")
assert ffi.sizeof("struct foo") == 8
s = ffi.new("struct foo *")
s.a = 511
py.test.raises(OverflowError, "s.a = 512")
py.test.raises(OverflowError, "s[0].a = 512")
assert s.a == 511
s.a = -512
py.test.raises(OverflowError, "s.a = -513")
py.test.raises(OverflowError, "s[0].a = -513")
assert s.a == -512
s.c = 3
assert s.c == 3
py.test.raises(OverflowError, "s.c = 4")
py.test.raises(OverflowError, "s[0].c = 4")
s.c = -4
assert s.c == -4
def test_ffi_buffer_with_file(self):
ffi = FFI(backend=self.Backend())
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_ffi_buffer_with_io(self):
ffi = FFI(backend=self.Backend())
import io, array
f = io.BytesIO()
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
def test_struct_packed(self):
ffi = FFI(backend=self.Backend())
ffi.cdef("struct nonpacked { char a; int b; };")
ffi.cdef("struct is_packed { char a; int b; };", packed=True)
assert ffi.sizeof("struct nonpacked") == 8
assert ffi.sizeof("struct is_packed") == 5
assert ffi.alignof("struct nonpacked") == 4
assert ffi.alignof("struct is_packed") == 1
s = ffi.new("struct is_packed[2]")
s[0].b = 42623381
s[0].a = b'X'
s[1].b = -4892220
s[1].a = b'Y'
assert s[0].b == 42623381
assert s[0].a == b'X'
assert s[1].b == -4892220
assert s[1].a == b'Y'
def get_struct(input_stream, start_offset, class_name, param_list = None) :
if param_list is None : param_list = []
structure = class_name(*param_list) # Unpack parameter list
struct_len = ctypes.sizeof(structure)
struct_data = input_stream[start_offset:start_offset + struct_len]
fit_len = min(len(struct_data), struct_len)
if (start_offset > file_end) or (fit_len < struct_len) :
print(col_r + "Error: Offset 0x%X out of bounds, possibly incomplete image!" % start_offset + col_e)
mce_exit(1)
ctypes.memmove(ctypes.addressof(structure), struct_data, fit_len)
return structure