python类addressof()的实例源码

process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def teb_base(self):
        """The address of the thread's TEB

            :type: :class:`int`
        """
        if windows.current_process.bitness == 32 and self.owner.bitness == 64:
            restype = rctypes.transform_type_to_remote64bits(THREAD_BASIC_INFORMATION)
            ressize = (ctypes.sizeof(restype))
            # Manual aligned allocation :DDDD
            nb_qword = (ressize + 8) / ctypes.sizeof(ULONGLONG)
            buffer = (nb_qword * ULONGLONG)()
            struct_address = ctypes.addressof(buffer)
            if (struct_address & 0xf) not in [0, 8]:
                raise ValueError("ULONGLONG array not aligned on 8")
            windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadBasicInformation, struct_address, ressize)
            return restype(struct_address, windows.current_process).TebBaseAddress

        res = THREAD_BASIC_INFORMATION()
        windows.winproxy.NtQueryInformationThread(self.handle, ThreadBasicInformation, byref(res), ctypes.sizeof(res))
        return res.TebBaseAddress
arrinter.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_args(self, call_flags,
                   shape, typekind, strides, length, bufsize, itemsize,
                   offset=0):
        if call_flags & 1:
            typekind_arg = typekind
        else:
            typekind_arg = None
        if call_flags & 2:
            strides_arg = strides
        else:
            strides_arg = None
        a = Exporter(shape, itemsize=itemsize, strides=strides_arg)
        self.assertEqual(sizeof(a._data), bufsize)
        self.assertEqual(a.data, ctypes.addressof(a._data) + offset)
        m = ArrayInterface(a)
        self.assertEqual(m.data, a.data)
        self.assertEqual(m.itemsize, itemsize)
        self.assertEqual(tuple(m.shape[0:m.nd]), shape)
        self.assertEqual(tuple(m.strides[0:m.nd]), strides)
recipe-475199.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __delitem__(self, i):
        size = self.size
        if i >= size:
            raise IndexError("list index out of range")

        if i < 0:
            i = size + i
            if i < 0:
                raise IndexError("list index out of range")

        # shift everything left by one
        address = ctypes.addressof(self.data)
        typesize = self._typesize
        to_address = address + i*typesize
        from_address = to_address + typesize
        ctypes.memmove(to_address, from_address, typesize*(size-i-1))

        self.size = size = size-1

        if self.prealloc_size > size*2:
            self.compact()
pcpfast.py 文件源码 项目:supremm 作者: ubccr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype):
    """ quicker implementation of pmExtractValue than the default provided with the pcp python bindings
        this version saves converting the C indexes to python and back again
    """

    inst = c_int()
    outAtom = pmapi.pmAtomValue()
    status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype)
    if status < 0:
        raise pmapi.pmErr(status)

    if dtype == c_api.PM_TYPE_STRING:
        # Get pointer to C string
        c_str = c_char_p()
        memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p))
        # Convert to a python string and have result point to it
        outAtom.cp = outAtom.cp
        # Free the C string
        LIBC.free(c_str)

    return outAtom.dref(dtype), inst.value
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ppid(self):
        """Parent Process ID

        :type: :class:`int`
        """
        if windows.current_process.bitness == 32 and self.bitness == 64:
            xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
            # Fuck-it <3
            data = (ctypes.c_char * ctypes.sizeof(xtype))()
            windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype))
            # Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data'
            x = xtype(ctypes.addressof(data), windows.current_process)
        else:
            information_type = 0
            x = PROCESS_BASIC_INFORMATION()
            winproxy.NtQueryInformationProcess(self.handle, information_type, x)
        return x.InheritedFromUniqueProcessId
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def virtual_protect(self, addr, size, protect, old_protect):
        """Change the access right of one or more page of the process"""
        if windows.current_process.bitness == 32 and self.bitness == 64:
            #addr = (addr >> 12) << 12
            #addr = ULONG64(addr)
            if size & 0x0fff:
                size = ((size >> 12) + 1) << 12
            #ssize = ULONG(size)
            old_protect = ctypes.addressof(old_protect)
            xaddr = ULONG64(addr)
            addr = ctypes.addressof(xaddr)
            xsize = ULONG(size)
            size = ctypes.addressof(xsize)
            return windows.syswow64.NtProtectVirtualMemory_32_to_64(self.handle, addr, size, protect, old_protect)
        else:
            winproxy.VirtualProtectEx(self.handle, addr, size, protect, old_protect)
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_mapped_filename(self, addr):
        """The filename mapped at address ``addr`` or ``None``

        :rtype: :class:`str` or ``None``
        """
        buffer_size = 0x1000
        buffer = ctypes.c_buffer(buffer_size)

        if  windows.current_process.bitness == 32 and self.bitness == 64:
             target_size = ctypes.c_buffer(buffer_size)
             try:
                windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size)
             except NtStatusException as e:
                if e.code not in  [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]:
                    raise
                return None
             remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString)
             mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process)
             return mapped_filename.str

        try:
                size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size)
        except winproxy.Kernel32Error as e:
            return None
        return buffer[:size]
process.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def modules(self):
        """The loaded modules present in the PEB

        :type: [:class:`LoadedModule`] -- List of loaded modules
        """
        res = []
        list_entry_ptr = ctypes.cast(self.Ldr.contents.InMemoryOrderModuleList.Flink, LIST_ENTRY_PTR)
        current_dll = list_entry_ptr.TO_LDR_ENTRY()
        while current_dll.DllBase:
            res.append(current_dll)
            list_entry_ptr = ctypes.cast(current_dll.InMemoryOrderLinks.Flink, LIST_ENTRY_PTR)
            current_dll = list_entry_ptr.TO_LDR_ENTRY()
        return [LoadedModule.from_address(addressof(LDR)) for LDR in res]


# Memory stuff
spi.py 文件源码 项目:rpi3-webiopi 作者: thortex 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def xfer(self, txbuff=None):
        length = len(txbuff)
        if PYTHON_MAJOR >= 3:
            _txbuff = bytes(txbuff)
            _txptr = ctypes.create_string_buffer(_txbuff)
        else:
            _txbuff = str(bytearray(txbuff))
            _txptr = ctypes.create_string_buffer(_txbuff)
        _rxptr = ctypes.create_string_buffer(length)

        data = struct.pack("QQLLHBBL",  #64 64 32 32 16 8 8 32 b = 32B
                    ctypes.addressof(_txptr),
                    ctypes.addressof(_rxptr),
                    length,
                    self.speed,
                    0, #delay
                    self.bits,
                    0, # cs_change,
                    0  # pad
                    )

        fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
        _rxbuff = ctypes.string_at(_rxptr, length)
        return bytearray(_rxbuff)
traildb.py 文件源码 项目:traildb-python 作者: traildb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def next(self):
        """Return the next event in the trail."""
        event = lib.tdb_cursor_next(self.cursor)
        if not event:
            raise StopIteration()

        address = addressof(event.contents.items)
        items = (tdb_item*event.contents.num_items).from_address(address)

        timestamp = event.contents.timestamp
        if self.parsetime:
            timestamp = datetime.fromtimestamp(event.contents.timestamp)
        if self.only_timestamp:
            return timestamp
        elif self.valuefun:
            return self.cls(timestamp, *(self.valuefun(item) for item in items))
        else:
            return self.cls(timestamp, *items)
_process_win32.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def arg_split(commandline, posix=False, strict=True):
        """Split a command line's arguments in a shell-like manner.

        This is a special version for windows that use a ctypes call to CommandLineToArgvW
        to do the argv splitting. The posix paramter is ignored.

        If strict=False, process_common.arg_split(...strict=False) is used instead.
        """
        #CommandLineToArgvW returns path to executable if called with empty string.
        if commandline.strip() == "":
            return []
        if not strict:
            # not really a cl-arg, fallback on _process_common
            return py_arg_split(commandline, posix=posix, strict=strict)
        argvn = c_int()
        result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn))
        result_array_type = LPCWSTR * argvn.value
        result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))]
        retval = LocalFree(result_pointer)
        return result
io_export_ogreDotScene.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def save( blenderobject, path ):
        cmesh = Mesh( blenderobject.data )
        start = time.time()
        dotmesh(
            path,
            ctypes.addressof( cmesh.faces ),
            ctypes.addressof( cmesh.faces_smooth ),
            ctypes.addressof( cmesh.faces_material_index ),
            ctypes.addressof( cmesh.vertex_positions ),
            ctypes.addressof( cmesh.vertex_normals ),
            cmesh.numFaces,
            cmesh.numVerts,
        )
        print('Mesh dumped in %s seconds' % (time.time()-start))

## Make sure we can import from same directory
spi.py 文件源码 项目:Cayenne-Agent 作者: myDevicesIoT 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def xfer(self, txbuff=None):
        length = len(txbuff)
        if PYTHON_MAJOR >= 3:
            _txbuff = bytes(txbuff)
            _txptr = ctypes.create_string_buffer(_txbuff)
        else:
            _txbuff = str(bytearray(txbuff))
            _txptr = ctypes.create_string_buffer(_txbuff)
        _rxptr = ctypes.create_string_buffer(length)

        data = struct.pack("QQLLHBBL",  #64 64 32 32 16 8 8 32 b = 32B
                    ctypes.addressof(_txptr),
                    ctypes.addressof(_rxptr),
                    length,
                    self.speed,
                    0, #delay
                    self.bits,
                    0, # cs_change,
                    0  # pad
                    )

        fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
        _rxbuff = ctypes.string_at(_rxptr, length)
        return bytearray(_rxbuff)
callbacks.py 文件源码 项目:rice 作者: randy3k 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_read_console(get_text):
    code = [None]

    def _read_console(p, buf, buflen, add_history):
        if not code[0]:
            text = get_text(p.decode(ENCODING), add_history)
            if text is None:
                return 0
            code[0] = text.encode(ENCODING)

        addr = ctypes.addressof(buf.contents)
        c2 = (ctypes.c_char * buflen).from_address(addr)
        nb = min(len(code[0]), buflen - 2)
        c2[:nb] = code[0][:nb]
        if nb < buflen - 2:
            c2[nb:(nb + 2)] = b'\n\0'
        code[0] = code[0][nb:]
        return 1

    return _read_console
frk.py 文件源码 项目:PyCS 作者: COSMOGRAIL 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def nprocessors():
  try:
    try:
      # Mac OS
      libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('libc'))
      v=ctypes.c_int(0)
      size=ctypes.c_size_t(ctypes.sizeof(v))
      libc.sysctlbyname('hw.ncpu', ctypes.c_voidp(ctypes.addressof(v)), ctypes.addressof(size), None, 0)
      return v.value
    except:
      # Cygwin (Windows) and Linuxes
      # Could try sysconf(_SC_NPROCESSORS_ONLN) (LSB) next.  Instead, count processors in cpuinfo.
      s = open('/proc/cpuinfo', 'r').read()
      return s.replace(' ', '').replace('\t', '').count('processor:')
  except:
    return 1
_process_win32.py 文件源码 项目:Repobot 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def arg_split(commandline, posix=False, strict=True):
        """Split a command line's arguments in a shell-like manner.

        This is a special version for windows that use a ctypes call to CommandLineToArgvW
        to do the argv splitting. The posix paramter is ignored.

        If strict=False, process_common.arg_split(...strict=False) is used instead.
        """
        #CommandLineToArgvW returns path to executable if called with empty string.
        if commandline.strip() == "":
            return []
        if not strict:
            # not really a cl-arg, fallback on _process_common
            return py_arg_split(commandline, posix=posix, strict=strict)
        argvn = c_int()
        result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn))
        result_array_type = LPCWSTR * argvn.value
        result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))]
        retval = LocalFree(result_pointer)
        return result
dbussy.py 文件源码 项目:dbussy 作者: ldo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def marshal(self) :
        "serializes this Message into the wire protocol format and returns a bytes object."
        buf = ct.POINTER(ct.c_ubyte)()
        nr_bytes = ct.c_int()
        if not dbus.dbus_message_marshal(self._dbobj, ct.byref(buf), ct.byref(nr_bytes)) :
            raise CallFailed("dbus_message_marshal")
        #end if
        result = bytearray(nr_bytes.value)
        ct.memmove \
          (
            ct.addressof((ct.c_ubyte * nr_bytes.value).from_buffer(result)),
            buf,
            nr_bytes.value
          )
        dbus.dbus_free(buf)
        return \
            result
    #end marshal
dbussy.py 文件源码 项目:dbussy 作者: ldo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def demarshal(celf, buf, error = None) :
        "deserializes a bytes or array-of-bytes object from the wire protocol" \
        " format into a Message object."
        error, my_error = _get_error(error)
        if isinstance(buf, bytes) :
            baseadr = ct.cast(buf, ct.c_void_p).value
        elif isinstance(buf, bytearray) :
            baseadr = ct.addressof((ct.c_ubyte * len(buf)).from_buffer(buf))
        elif isinstance(buf, array.array) and buf.typecode == "B" :
            baseadr = buf.buffer_info()[0]
        else :
            raise TypeError("buf is not bytes, bytearray or array.array of bytes")
        #end if
        msg = dbus.dbus_message_demarshal(baseadr, len(buf), error._dbobj)
        my_error.raise_if_set()
        if msg != None :
            msg = celf(msg)
        #end if
        return \
            msg
    #end demarshal
buftools.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def buffer_info(self):
        return (addressof(self.buffer), self.shape[0])
buftools.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_buffer(self, view, flags):
        from ctypes import addressof
        if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE and self.readonly:
            raise BufferError("buffer is read-only")
        if ((flags & PyBUF_C_CONTIGUOUS) == PyBUF_C_CONTIGUOUS and
            not self.is_contiguous('C')):
            raise BufferError("data is not C contiguous")
        if ((flags & PyBUF_F_CONTIGUOUS) == PyBUF_F_CONTIGUOUS and
            not self.is_contiguous('F')):
            raise BufferError("data is not F contiguous")
        if ((flags & PyBUF_ANY_CONTIGUOUS) == PyBUF_ANY_CONTIGUOUS and
            not self.is_contiguous('A')):
            raise BufferError("data is not contiguous")
        view.buf = self.buf
        view.readonly = self.readonly
        view.len = self.len
        if flags | PyBUF_WRITABLE == PyBUF_WRITABLE:
            view.ndim = 0
        else:
            view.ndim = self.ndim
        view.itemsize = self.itemsize
        if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
            view.format = addressof(self._format)
        else:
            view.format = None
        if (flags & PyBUF_ND) == PyBUF_ND:
            view.shape = addressof(self._shape)
        elif self.is_contiguous('C'):
            view.shape = None
        else:
            raise BufferError(
                "shape required for {} dimensional data".format(self.ndim))
        if (flags & PyBUF_STRIDES) == PyBUF_STRIDES:
            view.strides = ctypes.addressof(self._strides)
        elif view.shape is None or self.is_contiguous('C'):
            view.strides = None
        else:
            raise BufferError("strides required for none C contiguous data")
        view.suboffsets = None
        view.internal = None
        view.obj = self
buftools.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_attributes(self):
        a = Exporter((13, 5, 11, 3), '=h', (440, 88, 8, 2))
        self.assertEqual(a.ndim, 4)
        self.assertEqual(a.itemsize, 2)
        self.assertFalse(a.readonly)
        self.assertEqual(a.shape, (13, 5, 11, 3))
        self.assertEqual(a.format, '=h')
        self.assertEqual(a.strides, (440, 88, 8, 2))
        self.assertEqual(a.len, 4290)
        self.assertEqual(a.buflen, 5720)
        self.assertEqual(a.buf, ctypes.addressof(a._buf))
        a = Exporter((8,))
        self.assertEqual(a.ndim, 1)
        self.assertEqual(a.itemsize, 1)
        self.assertFalse(a.readonly)
        self.assertEqual(a.shape, (8,))
        self.assertEqual(a.format, 'B')
        self.assertTrue(isinstance(a.strides, tuple))
        self.assertEqual(a.strides, (1,))
        self.assertEqual(a.len, 8)
        self.assertEqual(a.buflen, 8)
        a = Exporter([13, 5, 11, 3], '=h', [440, 88, 8, 2])
        self.assertTrue(isinstance(a.shape, tuple))
        self.assertTrue(isinstance(a.strides, tuple))
        self.assertEqual(a.shape, (13, 5, 11, 3))
        self.assertEqual(a.strides, (440, 88, 8, 2))
arrinter.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def capsule_new(p):
        return PyCapsule_New(addressof(p), None, None)
arrinter.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def capsule_new(p):
        return PyCObject_FromVoidPtr(addressof(p), None)
arrinter.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_byteswapped(self):
        a = Array((1,), 'u', 4, flags=(PAI_ALIGNED | PAI_WRITEABLE))
        ct = a._ctype
        self.assertTrue(ct is not c_uint32)
        if sys.byteorder == 'little':
            self.assertTrue(ct is c_uint32.__ctype_be__)
        else:
            self.assertTrue(ct is c_uint32.__ctype_le__)
        i = 0xa0b0c0d
        n = c_uint32(i)
        a[0] = i
        self.assertEqual(a[0], i)
        self.assertEqual(a._data[0:4],
                         cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
base_test.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def NEWBUF_test_bad_format(self):
        from pygame.bufferproxy import BufferProxy
        from pygame.newbuffer import BufferMixin
        from ctypes import create_string_buffer, addressof

        buftools = self.buftools
        Exporter = buftools.Exporter
        Importer = buftools.Importer
        PyBUF_FORMAT = buftools.PyBUF_FORMAT

        for format in ['', '=', '1', ' ', '2h', '=2h',
                       '0x', '11x', '=!', 'h ', ' h', 'hh', '?']:
            exp = Exporter((1,), format, itemsize=2)
            b = BufferProxy(exp)
            self.assertRaises(ValueError, Importer, b, PyBUF_FORMAT)
backend_ctypes.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_own_repr(self):
        return self._addr_repr(ctypes.addressof(self._blob))
backend_ctypes.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_own_repr(self):
        return self._addr_repr(ctypes.addressof(self._blob))
backend_ctypes.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _convert_to_address(self, BClass):
        if getattr(BClass, '_BItem', None) is self.__class__:
            return ctypes.addressof(self._blob)
        else:
            return CTypesData._convert_to_address(self, BClass)
backend_ctypes.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def write_variable(self, BType, name, value):
        new_ctypes_obj = BType._to_ctypes(value)
        ctypes_obj = BType._ctype.in_dll(self.cdll, name)
        ctypes.memmove(ctypes.addressof(ctypes_obj),
                       ctypes.addressof(new_ctypes_obj),
                       ctypes.sizeof(BType._ctype))
protocol.py 文件源码 项目:gps_track_pod 作者: iwanders 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def read(cls, byte_object):
        a = cls()
        ctypes.memmove(ctypes.addressof(a), bytes(byte_object),
                       min(len(byte_object), ctypes.sizeof(cls)))
        return a


# Mixin to allow conversion of a ctypes structure to and from a dictionary.


问题


面经


文章

微信
公众号

扫码关注公众号