def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
python类string_at()的实例源码
def inet_pton(address_family, ip_string):
if address_family == socket.AF_INET:
return socket.inet_aton(ip_string)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def xfer(self, txbuff=None):
length = len(txbuff)
if PYTHON_MAJOR >= 3:
_txbuff = bytes(txbuff)
_txptr = ctypes.create_string_buffer(_txbuff)
else:
_txbuff = str(bytearray(txbuff))
_txptr = ctypes.create_string_buffer(_txbuff)
_rxptr = ctypes.create_string_buffer(length)
data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B
ctypes.addressof(_txptr),
ctypes.addressof(_rxptr),
length,
self.speed,
0, #delay
self.bits,
0, # cs_change,
0 # pad
)
fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
_rxbuff = ctypes.string_at(_rxptr, length)
return bytearray(_rxbuff)
def utf16tostr(addr, size=-1):
"""Read UTF-16 string from memory and encode as python string."""
cb = size if size > 0 else 32
bstr = ctypes.string_at(addr, cb)
if size >= 0:
return bstr.decode('utf-16le')
# lookup zero char
chunks = []
while True:
found = cb
for i in range(0, cb, 2):
c = bstr[i]
if c == 0x00:
found = i
break
pass
assert found % 2 == 0, "truncated string with len " + str(found)
chunks.append(bstr[0:found].decode('utf-16le'))
if found != cb:
break
addr = addr + cb
bstr = ctypes.string_at(addr, cb)
continue
return "".join(chunks)
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def xfer(self, txbuff=None):
length = len(txbuff)
if PYTHON_MAJOR >= 3:
_txbuff = bytes(txbuff)
_txptr = ctypes.create_string_buffer(_txbuff)
else:
_txbuff = str(bytearray(txbuff))
_txptr = ctypes.create_string_buffer(_txbuff)
_rxptr = ctypes.create_string_buffer(length)
data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B
ctypes.addressof(_txptr),
ctypes.addressof(_rxptr),
length,
self.speed,
0, #delay
self.bits,
0, # cs_change,
0 # pad
)
fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
_rxbuff = ctypes.string_at(_rxptr, length)
return bytearray(_rxbuff)
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def _callback_write(self, buf, key=PiVideoFrameType.frame):
"""
Overridden to strip alpha bytes when required.
"""
if self._strip_alpha:
s = ct.string_at(buf[0].data, buf[0].length)
s = bytearray(s)
del s[3::4]
# All this messing around with buffers is to work around some issue
# with MMAL or ctypes (I'm not sure which is at fault). Anyway, the
# upshot is that if you fiddle with buf[0].data in any way
# whatsoever (even if you make every attempt to restore its value
# afterward), mmal_port_disable locks up when we call it in stop()
new_buf = mmal.MMAL_BUFFER_HEADER_T.from_buffer_copy(buf[0])
new_buf.length = len(s)
new_buf.data = ct.pointer(ct.c_uint8.from_buffer(s))
return super(PiRawMixin, self)._callback_write(ct.pointer(new_buf), key)
else:
return super(PiRawMixin, self)._callback_write(buf, key)
def update(self, dt):
sample, self.sample = self.sample, None
if sample is None:
return
try:
buf = sample.get_buffer()
result, mapinfo = buf.map(Gst.MapFlags.READ)
addr = mapinfo.__hash__()
c_mapinfo = _MapInfo.from_address(addr)
sbuf = string_at(c_mapinfo.data, mapinfo.size)
self.texture.blit_buffer(sbuf, colorfmt = 'rgb')
finally:
if mapinfo is not None:
buf.unmap(mapinfo)
self.image.canvas.ask_update()
def TraceThreadProc(self):
"""
Return a list of traces, corresponding to the list of required idx
"""
while self.PLCStatus == "Started":
tick = ctypes.c_uint32()
size = ctypes.c_uint32()
buff = ctypes.c_void_p()
TraceBuffer = None
if self.PLClibraryLock.acquire(False):
if self._GetDebugData(ctypes.byref(tick),
ctypes.byref(size),
ctypes.byref(buff)) == 0:
if size.value:
TraceBuffer = ctypes.string_at(buff.value, size.value)
self._FreeDebugData()
self.PLClibraryLock.release()
if TraceBuffer is not None:
self._TracesPush((tick.value, TraceBuffer))
self._TracesAutoSuspend()
self._TracesFlush()
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
####################################################################################
#
# SDDS Payload Containers
#
####################################################################################
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def asString(self):
return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def surf_same_image(a, b):
"""Return True if a's pixel buffer is identical to b's"""
a_sz = a.get_height() * a.get_pitch()
b_sz = b.get_height() * b.get_pitch()
if a_sz != b_sz:
return False
a_bytes = ctypes.string_at(a._pixels_address, a_sz)
b_bytes = ctypes.string_at(b._pixels_address, b_sz)
return a_bytes == b_bytes
def string_result(result, func, arguments):
"""Errcheck function. Returns a string and frees the original pointer.
It assumes the result is a char *.
"""
if result:
# make a python string copy
s = bytes_to_str(ctypes.string_at(result))
# free original string ptr
libvlc_free(result)
return s
return None