def teb_base(self):
"""The address of the thread's TEB
:type: :class:`int`
"""
if windows.current_process.bitness == 32 and self.owner.bitness == 64:
restype = rctypes.transform_type_to_remote64bits(THREAD_BASIC_INFORMATION)
ressize = (ctypes.sizeof(restype))
# Manual aligned allocation :DDDD
nb_qword = (ressize + 8) / ctypes.sizeof(ULONGLONG)
buffer = (nb_qword * ULONGLONG)()
struct_address = ctypes.addressof(buffer)
if (struct_address & 0xf) not in [0, 8]:
raise ValueError("ULONGLONG array not aligned on 8")
windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadBasicInformation, struct_address, ressize)
return restype(struct_address, windows.current_process).TebBaseAddress
res = THREAD_BASIC_INFORMATION()
windows.winproxy.NtQueryInformationThread(self.handle, ThreadBasicInformation, byref(res), ctypes.sizeof(res))
return res.TebBaseAddress
python类addressof()的实例源码
def check_args(self, call_flags,
shape, typekind, strides, length, bufsize, itemsize,
offset=0):
if call_flags & 1:
typekind_arg = typekind
else:
typekind_arg = None
if call_flags & 2:
strides_arg = strides
else:
strides_arg = None
a = Exporter(shape, itemsize=itemsize, strides=strides_arg)
self.assertEqual(sizeof(a._data), bufsize)
self.assertEqual(a.data, ctypes.addressof(a._data) + offset)
m = ArrayInterface(a)
self.assertEqual(m.data, a.data)
self.assertEqual(m.itemsize, itemsize)
self.assertEqual(tuple(m.shape[0:m.nd]), shape)
self.assertEqual(tuple(m.strides[0:m.nd]), strides)
def __delitem__(self, i):
size = self.size
if i >= size:
raise IndexError("list index out of range")
if i < 0:
i = size + i
if i < 0:
raise IndexError("list index out of range")
# shift everything left by one
address = ctypes.addressof(self.data)
typesize = self._typesize
to_address = address + i*typesize
from_address = to_address + typesize
ctypes.memmove(to_address, from_address, typesize*(size-i-1))
self.size = size = size-1
if self.prealloc_size > size*2:
self.compact()
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype):
""" quicker implementation of pmExtractValue than the default provided with the pcp python bindings
this version saves converting the C indexes to python and back again
"""
inst = c_int()
outAtom = pmapi.pmAtomValue()
status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype)
if status < 0:
raise pmapi.pmErr(status)
if dtype == c_api.PM_TYPE_STRING:
# Get pointer to C string
c_str = c_char_p()
memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p))
# Convert to a python string and have result point to it
outAtom.cp = outAtom.cp
# Free the C string
LIBC.free(c_str)
return outAtom.dref(dtype), inst.value
def ppid(self):
"""Parent Process ID
:type: :class:`int`
"""
if windows.current_process.bitness == 32 and self.bitness == 64:
xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
# Fuck-it <3
data = (ctypes.c_char * ctypes.sizeof(xtype))()
windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype))
# Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data'
x = xtype(ctypes.addressof(data), windows.current_process)
else:
information_type = 0
x = PROCESS_BASIC_INFORMATION()
winproxy.NtQueryInformationProcess(self.handle, information_type, x)
return x.InheritedFromUniqueProcessId
def virtual_protect(self, addr, size, protect, old_protect):
"""Change the access right of one or more page of the process"""
if windows.current_process.bitness == 32 and self.bitness == 64:
#addr = (addr >> 12) << 12
#addr = ULONG64(addr)
if size & 0x0fff:
size = ((size >> 12) + 1) << 12
#ssize = ULONG(size)
old_protect = ctypes.addressof(old_protect)
xaddr = ULONG64(addr)
addr = ctypes.addressof(xaddr)
xsize = ULONG(size)
size = ctypes.addressof(xsize)
return windows.syswow64.NtProtectVirtualMemory_32_to_64(self.handle, addr, size, protect, old_protect)
else:
winproxy.VirtualProtectEx(self.handle, addr, size, protect, old_protect)
def get_mapped_filename(self, addr):
"""The filename mapped at address ``addr`` or ``None``
:rtype: :class:`str` or ``None``
"""
buffer_size = 0x1000
buffer = ctypes.c_buffer(buffer_size)
if windows.current_process.bitness == 32 and self.bitness == 64:
target_size = ctypes.c_buffer(buffer_size)
try:
windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size)
except NtStatusException as e:
if e.code not in [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]:
raise
return None
remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString)
mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process)
return mapped_filename.str
try:
size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size)
except winproxy.Kernel32Error as e:
return None
return buffer[:size]
def modules(self):
"""The loaded modules present in the PEB
:type: [:class:`LoadedModule`] -- List of loaded modules
"""
res = []
list_entry_ptr = ctypes.cast(self.Ldr.contents.InMemoryOrderModuleList.Flink, LIST_ENTRY_PTR)
current_dll = list_entry_ptr.TO_LDR_ENTRY()
while current_dll.DllBase:
res.append(current_dll)
list_entry_ptr = ctypes.cast(current_dll.InMemoryOrderLinks.Flink, LIST_ENTRY_PTR)
current_dll = list_entry_ptr.TO_LDR_ENTRY()
return [LoadedModule.from_address(addressof(LDR)) for LDR in res]
# Memory stuff
def xfer(self, txbuff=None):
length = len(txbuff)
if PYTHON_MAJOR >= 3:
_txbuff = bytes(txbuff)
_txptr = ctypes.create_string_buffer(_txbuff)
else:
_txbuff = str(bytearray(txbuff))
_txptr = ctypes.create_string_buffer(_txbuff)
_rxptr = ctypes.create_string_buffer(length)
data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B
ctypes.addressof(_txptr),
ctypes.addressof(_rxptr),
length,
self.speed,
0, #delay
self.bits,
0, # cs_change,
0 # pad
)
fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
_rxbuff = ctypes.string_at(_rxptr, length)
return bytearray(_rxbuff)
def next(self):
"""Return the next event in the trail."""
event = lib.tdb_cursor_next(self.cursor)
if not event:
raise StopIteration()
address = addressof(event.contents.items)
items = (tdb_item*event.contents.num_items).from_address(address)
timestamp = event.contents.timestamp
if self.parsetime:
timestamp = datetime.fromtimestamp(event.contents.timestamp)
if self.only_timestamp:
return timestamp
elif self.valuefun:
return self.cls(timestamp, *(self.valuefun(item) for item in items))
else:
return self.cls(timestamp, *items)
def arg_split(commandline, posix=False, strict=True):
"""Split a command line's arguments in a shell-like manner.
This is a special version for windows that use a ctypes call to CommandLineToArgvW
to do the argv splitting. The posix paramter is ignored.
If strict=False, process_common.arg_split(...strict=False) is used instead.
"""
#CommandLineToArgvW returns path to executable if called with empty string.
if commandline.strip() == "":
return []
if not strict:
# not really a cl-arg, fallback on _process_common
return py_arg_split(commandline, posix=posix, strict=strict)
argvn = c_int()
result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn))
result_array_type = LPCWSTR * argvn.value
result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))]
retval = LocalFree(result_pointer)
return result
def save( blenderobject, path ):
cmesh = Mesh( blenderobject.data )
start = time.time()
dotmesh(
path,
ctypes.addressof( cmesh.faces ),
ctypes.addressof( cmesh.faces_smooth ),
ctypes.addressof( cmesh.faces_material_index ),
ctypes.addressof( cmesh.vertex_positions ),
ctypes.addressof( cmesh.vertex_normals ),
cmesh.numFaces,
cmesh.numVerts,
)
print('Mesh dumped in %s seconds' % (time.time()-start))
## Make sure we can import from same directory
def xfer(self, txbuff=None):
length = len(txbuff)
if PYTHON_MAJOR >= 3:
_txbuff = bytes(txbuff)
_txptr = ctypes.create_string_buffer(_txbuff)
else:
_txbuff = str(bytearray(txbuff))
_txptr = ctypes.create_string_buffer(_txbuff)
_rxptr = ctypes.create_string_buffer(length)
data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B
ctypes.addressof(_txptr),
ctypes.addressof(_rxptr),
length,
self.speed,
0, #delay
self.bits,
0, # cs_change,
0 # pad
)
fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
_rxbuff = ctypes.string_at(_rxptr, length)
return bytearray(_rxbuff)
def create_read_console(get_text):
code = [None]
def _read_console(p, buf, buflen, add_history):
if not code[0]:
text = get_text(p.decode(ENCODING), add_history)
if text is None:
return 0
code[0] = text.encode(ENCODING)
addr = ctypes.addressof(buf.contents)
c2 = (ctypes.c_char * buflen).from_address(addr)
nb = min(len(code[0]), buflen - 2)
c2[:nb] = code[0][:nb]
if nb < buflen - 2:
c2[nb:(nb + 2)] = b'\n\0'
code[0] = code[0][nb:]
return 1
return _read_console
def nprocessors():
try:
try:
# Mac OS
libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('libc'))
v=ctypes.c_int(0)
size=ctypes.c_size_t(ctypes.sizeof(v))
libc.sysctlbyname('hw.ncpu', ctypes.c_voidp(ctypes.addressof(v)), ctypes.addressof(size), None, 0)
return v.value
except:
# Cygwin (Windows) and Linuxes
# Could try sysconf(_SC_NPROCESSORS_ONLN) (LSB) next. Instead, count processors in cpuinfo.
s = open('/proc/cpuinfo', 'r').read()
return s.replace(' ', '').replace('\t', '').count('processor:')
except:
return 1
def arg_split(commandline, posix=False, strict=True):
"""Split a command line's arguments in a shell-like manner.
This is a special version for windows that use a ctypes call to CommandLineToArgvW
to do the argv splitting. The posix paramter is ignored.
If strict=False, process_common.arg_split(...strict=False) is used instead.
"""
#CommandLineToArgvW returns path to executable if called with empty string.
if commandline.strip() == "":
return []
if not strict:
# not really a cl-arg, fallback on _process_common
return py_arg_split(commandline, posix=posix, strict=strict)
argvn = c_int()
result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn))
result_array_type = LPCWSTR * argvn.value
result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))]
retval = LocalFree(result_pointer)
return result
def marshal(self) :
"serializes this Message into the wire protocol format and returns a bytes object."
buf = ct.POINTER(ct.c_ubyte)()
nr_bytes = ct.c_int()
if not dbus.dbus_message_marshal(self._dbobj, ct.byref(buf), ct.byref(nr_bytes)) :
raise CallFailed("dbus_message_marshal")
#end if
result = bytearray(nr_bytes.value)
ct.memmove \
(
ct.addressof((ct.c_ubyte * nr_bytes.value).from_buffer(result)),
buf,
nr_bytes.value
)
dbus.dbus_free(buf)
return \
result
#end marshal
def demarshal(celf, buf, error = None) :
"deserializes a bytes or array-of-bytes object from the wire protocol" \
" format into a Message object."
error, my_error = _get_error(error)
if isinstance(buf, bytes) :
baseadr = ct.cast(buf, ct.c_void_p).value
elif isinstance(buf, bytearray) :
baseadr = ct.addressof((ct.c_ubyte * len(buf)).from_buffer(buf))
elif isinstance(buf, array.array) and buf.typecode == "B" :
baseadr = buf.buffer_info()[0]
else :
raise TypeError("buf is not bytes, bytearray or array.array of bytes")
#end if
msg = dbus.dbus_message_demarshal(baseadr, len(buf), error._dbobj)
my_error.raise_if_set()
if msg != None :
msg = celf(msg)
#end if
return \
msg
#end demarshal
def buffer_info(self):
return (addressof(self.buffer), self.shape[0])
def _get_buffer(self, view, flags):
from ctypes import addressof
if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE and self.readonly:
raise BufferError("buffer is read-only")
if ((flags & PyBUF_C_CONTIGUOUS) == PyBUF_C_CONTIGUOUS and
not self.is_contiguous('C')):
raise BufferError("data is not C contiguous")
if ((flags & PyBUF_F_CONTIGUOUS) == PyBUF_F_CONTIGUOUS and
not self.is_contiguous('F')):
raise BufferError("data is not F contiguous")
if ((flags & PyBUF_ANY_CONTIGUOUS) == PyBUF_ANY_CONTIGUOUS and
not self.is_contiguous('A')):
raise BufferError("data is not contiguous")
view.buf = self.buf
view.readonly = self.readonly
view.len = self.len
if flags | PyBUF_WRITABLE == PyBUF_WRITABLE:
view.ndim = 0
else:
view.ndim = self.ndim
view.itemsize = self.itemsize
if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
view.format = addressof(self._format)
else:
view.format = None
if (flags & PyBUF_ND) == PyBUF_ND:
view.shape = addressof(self._shape)
elif self.is_contiguous('C'):
view.shape = None
else:
raise BufferError(
"shape required for {} dimensional data".format(self.ndim))
if (flags & PyBUF_STRIDES) == PyBUF_STRIDES:
view.strides = ctypes.addressof(self._strides)
elif view.shape is None or self.is_contiguous('C'):
view.strides = None
else:
raise BufferError("strides required for none C contiguous data")
view.suboffsets = None
view.internal = None
view.obj = self
def test_attributes(self):
a = Exporter((13, 5, 11, 3), '=h', (440, 88, 8, 2))
self.assertEqual(a.ndim, 4)
self.assertEqual(a.itemsize, 2)
self.assertFalse(a.readonly)
self.assertEqual(a.shape, (13, 5, 11, 3))
self.assertEqual(a.format, '=h')
self.assertEqual(a.strides, (440, 88, 8, 2))
self.assertEqual(a.len, 4290)
self.assertEqual(a.buflen, 5720)
self.assertEqual(a.buf, ctypes.addressof(a._buf))
a = Exporter((8,))
self.assertEqual(a.ndim, 1)
self.assertEqual(a.itemsize, 1)
self.assertFalse(a.readonly)
self.assertEqual(a.shape, (8,))
self.assertEqual(a.format, 'B')
self.assertTrue(isinstance(a.strides, tuple))
self.assertEqual(a.strides, (1,))
self.assertEqual(a.len, 8)
self.assertEqual(a.buflen, 8)
a = Exporter([13, 5, 11, 3], '=h', [440, 88, 8, 2])
self.assertTrue(isinstance(a.shape, tuple))
self.assertTrue(isinstance(a.strides, tuple))
self.assertEqual(a.shape, (13, 5, 11, 3))
self.assertEqual(a.strides, (440, 88, 8, 2))
def capsule_new(p):
return PyCapsule_New(addressof(p), None, None)
def capsule_new(p):
return PyCObject_FromVoidPtr(addressof(p), None)
def test_byteswapped(self):
a = Array((1,), 'u', 4, flags=(PAI_ALIGNED | PAI_WRITEABLE))
ct = a._ctype
self.assertTrue(ct is not c_uint32)
if sys.byteorder == 'little':
self.assertTrue(ct is c_uint32.__ctype_be__)
else:
self.assertTrue(ct is c_uint32.__ctype_le__)
i = 0xa0b0c0d
n = c_uint32(i)
a[0] = i
self.assertEqual(a[0], i)
self.assertEqual(a._data[0:4],
cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
def NEWBUF_test_bad_format(self):
from pygame.bufferproxy import BufferProxy
from pygame.newbuffer import BufferMixin
from ctypes import create_string_buffer, addressof
buftools = self.buftools
Exporter = buftools.Exporter
Importer = buftools.Importer
PyBUF_FORMAT = buftools.PyBUF_FORMAT
for format in ['', '=', '1', ' ', '2h', '=2h',
'0x', '11x', '=!', 'h ', ' h', 'hh', '?']:
exp = Exporter((1,), format, itemsize=2)
b = BufferProxy(exp)
self.assertRaises(ValueError, Importer, b, PyBUF_FORMAT)
def _get_own_repr(self):
return self._addr_repr(ctypes.addressof(self._blob))
def _get_own_repr(self):
return self._addr_repr(ctypes.addressof(self._blob))
def _convert_to_address(self, BClass):
if getattr(BClass, '_BItem', None) is self.__class__:
return ctypes.addressof(self._blob)
else:
return CTypesData._convert_to_address(self, BClass)
def write_variable(self, BType, name, value):
new_ctypes_obj = BType._to_ctypes(value)
ctypes_obj = BType._ctype.in_dll(self.cdll, name)
ctypes.memmove(ctypes.addressof(ctypes_obj),
ctypes.addressof(new_ctypes_obj),
ctypes.sizeof(BType._ctype))
def read(cls, byte_object):
a = cls()
ctypes.memmove(ctypes.addressof(a), bytes(byte_object),
min(len(byte_object), ctypes.sizeof(cls)))
return a
# Mixin to allow conversion of a ctypes structure to and from a dictionary.