def _build_button_map(self, config):
"""
Returns button map readed from configuration, in format situable
for HIDDecoder.buttons.button_map field.
Generates default if config is not available.
"""
if config:
# Last possible value is default "maps-to-nothing" mapping
buttons = [BUTTON_COUNT - 1] * BUTTON_COUNT
for keycode, value in config.get('buttons', {}).items():
keycode = int(keycode) - FIRST_BUTTON
if keycode < 0 or keycode >= BUTTON_COUNT:
# Out of range
continue
if value in TRIGGERS:
# Not used here
pass
else:
buttons[keycode] = self.button_to_bit(getattr(SCButtons, value))
else:
buttons = list(xrange(BUTTON_COUNT))
return (ctypes.c_uint8 * BUTTON_COUNT)(*buttons)
python类c_uint8()的实例源码
def Pokearray(length):
# okay, I learned.
# It's not possible to use a custom base class
# in a ctypes.Structure field. Forget about it
@classmethod
def fromBytes(cls, data):
return cls(*data)
def asBytes(self):
return bytes(iter(self))
t = ctypes.c_uint8 * length
t.fromBytes = fromBytes
t.bytes = asBytes
return t
def setUp(self):
import enum, ctypes
class Fruit(enum.Enum):
Apple, Banana, Citron = range(3)
class TestStruct(PokeStructure):
_fields_ = [
("_fruit", ctypes.c_uint8),
]
_adapters_ = [
("_fruit", Fruit)
]
self.Fruit = Fruit
self.TestStruct = TestStruct
def _return_ctype(self):
""" Returns the associated ctype of a given datatype. """
_datatype_ctype = {
DataType.Bool: ctypes.c_uint8,
DataType.I8: ctypes.c_int8,
DataType.U8: ctypes.c_uint8,
DataType.I16: ctypes.c_int16,
DataType.U16: ctypes.c_uint16,
DataType.I32: ctypes.c_int32,
DataType.U32: ctypes.c_uint32,
DataType.I64: ctypes.c_int64,
DataType.U64: ctypes.c_uint64,
DataType.Sgl: ctypes.c_float,
DataType.Dbl: ctypes.c_double,
}
return _datatype_ctype[self]
def testCallReadSuccess(self, thread, iokit, cf, GetDeviceIntProperty): # pylint: disable=invalid-name
init_mock_iokit(iokit)
init_mock_cf(cf)
init_mock_get_int_property(GetDeviceIntProperty)
device = macos.MacOsHidDevice('fakepath')
# Call callback for IN report
report = (ctypes.c_uint8 * 64)()
report[:] = range(64)[:]
q = device.read_queue
macos.HidReadCallback(q, None, None, None, 0, report, 64)
# Device read should return the callback data
read_result = device.Read()
self.assertEquals(read_result, range(64), 'Read data should match data '
'passed into the callback')
def _decode_video_packet(self, packet):
width = self.video_format.width
height = self.video_format.height
pitch = width * 3
buffer = (ctypes.c_uint8 * (pitch * height))()
result = av.avbin_decode_video(self._video_stream,
packet.data, packet.size,
buffer)
if result < 0:
image_data = None
else:
image_data = image.ImageData(width, height, 'RGB', buffer, pitch)
packet.image = image_data
# Notify get_next_video_frame() that another one is ready.
self._condition.acquire()
self._condition.notify()
self._condition.release()
def update(self, source):
"""
Update the overlay with a new source of data.
The new *source* buffer must have the same size as the original buffer
used to create the overlay. There is currently no method for changing
the size of an existing overlay (remove and recreate the overlay if you
require this).
"""
port = self.renderer[0].input[0]
fmt = port[0].format
bp = ct.c_uint8 * (fmt[0].es[0].video.width * fmt[0].es[0].video.height * 3)
try:
sp = bp.from_buffer(source)
except TypeError:
sp = bp.from_buffer_copy(source)
buf = mmal.mmal_queue_get(self.pool[0].queue)
if not buf:
raise PiCameraRuntimeError(
"Couldn't get a buffer from the overlay's pool")
ct.memmove(buf[0].data, sp, buf[0].alloc_size)
buf[0].length = buf[0].alloc_size
mmal_check(
mmal.mmal_port_send_buffer(port, buf),
prefix="Unable to send a buffer to the overlay's port")
def _decode_video_packet(self, packet):
width = self.video_format.width
height = self.video_format.height
pitch = width * 3
buffer = (ctypes.c_uint8 * (pitch * height))()
result = av.avbin_decode_video(self._video_stream,
packet.data, packet.size,
buffer)
if result < 0:
image_data = None
else:
image_data = image.ImageData(width, height, 'RGB', buffer, pitch)
packet.image = image_data
# Notify get_next_video_frame() that another one is ready.
self._condition.acquire()
self._condition.notify()
self._condition.release()
def _read_block(self, block):
"""Reads a block from a Mifare Card after authentication
Returns the data read or raises an exception
"""
if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0:
raise Exception("Error setting Easy Framing property")
abttx = (ctypes.c_uint8 * 2)()
abttx[0] = self.MC_READ
abttx[1] = block
abtrx = (ctypes.c_uint8 * 250)()
res = nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx),
ctypes.pointer(abtrx), len(abtrx), 0)
if res < 0:
raise IOError("Error reading data")
return "".join([chr(abtrx[i]) for i in range(res)])
def __write_block(self, block, data):
"""Writes a block of data to a Mifare Card after authentication
Raises an exception on error
"""
if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0:
raise Exception("Error setting Easy Framing property")
if len(data) > 16:
raise ValueError("Data value to be written cannot be more than 16 characters.")
abttx = (ctypes.c_uint8 * 18)()
abttx[0] = self.MC_WRITE
abttx[1] = block
abtrx = (ctypes.c_uint8 * 250)()
for i in range(16):
abttx[i + 2] = ord((data + "\x00" * (16 - len(data)))[i])
return nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx),
ctypes.pointer(abtrx), len(abtrx), 0)
def write(self, addr, data):
"""
Writes data from the array into the device starting at the given address.
@param int addr: Start address of the memory block to write.
@param sequence data: Data to write. Any type that implements the sequence API (i.e. string, list, bytearray...) is valid as input.
"""
if not self._is_u32(addr):
raise ValueError('The addr parameter must be an unsigned 32-bit value.')
if not self._is_valid_buf(data):
raise ValueError('The data parameter must be a sequence type with at least one item.')
addr = ctypes.c_uint32(addr)
data_len = ctypes.c_uint32(len(data))
data = (ctypes.c_uint8 * data_len.value)(*data)
self.jlink.JLINKARM_WriteMem(addr, data_len, ctypes.byref(data))
def read(self, addr, data_len):
"""
Reads data_len bytes from the device starting at the given address.
@param int addr: Start address of the memory block to read.
@param int data_len: Number of bytes to read.
@return [int]: List of values read.
"""
if not self._is_u32(addr):
raise ValueError('The addr parameter must be an unsigned 32-bit value.')
if not self._is_u32(data_len):
raise ValueError('The data_len parameter must be an unsigned 32-bit value.')
addr = ctypes.c_uint32(addr)
data_len = ctypes.c_uint32(data_len)
data = (ctypes.c_uint8 * data_len.value)()
self.jlink.JLINKARM_ReadMem(addr, data_len, ctypes.byref(data))
return bytes(data)
def callback_friend_request(self, callback, user_data):
"""
Set the callback for the `friend_request` event. Pass None to unset.
This event is triggered when a friend request is received.
:param callback: Python function. Should take pointer (c_void_p) to Tox object,
The Public Key (c_uint8 array) of the user who sent the friend request,
The message (c_char_p) they sent along with the request,
The size (c_size_t) of the message byte array,
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, POINTER(c_uint8), c_char_p, c_size_t, c_void_p)
self.friend_request_cb = c_callback(callback)
Tox.libtoxcore.tox_callback_friend_request(self._tox_pointer, self.friend_request_cb, c_void_p(user_data))
def callback_friend_lossless_packet(self, callback, user_data):
"""
Set the callback for the `friend_lossless_packet` event. Pass NULL to unset.
:param callback: Python function.
Should take pointer (c_void_p) to Tox object,
friend_number (c_uint32) - The friend number of the friend who sent a lossless packet,
A byte array (c_uint8 array) containing the received packet data,
length (c_size_t) - The length of the packet data byte array,
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, POINTER(c_uint8), c_size_t, c_void_p)
self.friend_lossless_packet_cb = c_callback(callback)
self.libtoxcore.tox_callback_friend_lossless_packet(self._tox_pointer, self.friend_lossless_packet_cb,
user_data)
# -----------------------------------------------------------------------------------------------------------------
# Low-level network information
# -----------------------------------------------------------------------------------------------------------------
def __init__(self, name='color', width=640, height=480, fps=30, color_format='rgb'):
self.native = True
self.stream = rs_stream.RS_STREAM_COLOR
if color_format == 'rgb':
self.format = rs_format.RS_FORMAT_RGB8
n_channels = 3
elif color_format == 'bgr':
self.format = rs_format.RS_FORMAT_BGR8
n_channels = 3
elif color_format == 'yuv':
self.format = rs_format.RS_FORMAT_YUYV
n_channels = 2
else:
raise ValueError('Unknown color format. Expected rgb, bgr, or yuv ({} given)').format(color_format)
self.shape = (height, width, n_channels)
self.dtype = ctypes.c_uint8
super(ColorStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def __init__(self, name='cad', width=640, height=480, fps=30, color_format='rgb'):
self.native = False
self.stream = rs_stream.RS_STREAM_COLOR_ALIGNED_TO_DEPTH
if color_format == 'rgb':
self.format = rs_format.RS_FORMAT_RGB8
n_channels = 3
elif color_format == 'bgr':
self.format = rs_format.RS_FORMAT_BGR8
n_channels = 3
elif color_format == 'yuv':
self.format = rs_format.RS_FORMAT_YUYV
n_channels = 2
else:
raise ValueError('Unknown color format. Expected rgb, bgr, or yuv ({} given)').format(color_format)
self.shape = (height, width, n_channels)
self.dtype = ctypes.c_uint8
super(CADStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def set_leds_state(pixels):
"""Set LED's
Layout of the array:
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
RGB RGB RGB RGB RGB RGB RGB RGB
The first 24 bytes of the array stores the first line of the LED's, the
following 24 bytes stores the second line, etc.
Each value of the array should not be greater than 0x1F.
Note: An exception is thrown if it fails.
"""
buffer = (ctypes.c_uint8 * len(pixels))(*pixels)
ret = _LIB.rpisensehat_set_leds_state(buffer)
if ret < 0:
raise Exception("rpisensehat set leds state failed")
def load_image(ptr, options, data):
"""Load a JPEG image in the FT800 chip memory.
ptr: Starting address to decompress the image
options: Can be FT800_OPT_RGB565 or FT800_OPT_MONO
data: A list of bytes
Note: An exception is thrown if it fails to load the image in the memory of
the FT800 chip.
"""
length = len(data)
_data = (ctypes.c_uint8 * length)(*data)
ret = _LIB.eve_click_load_image(ptr, options, _data, length)
if ret < 0:
raise Exception("eve click load image failed")
def inflate(ptr, data):
"""Decompress some data in FT800 memory.
The data must have been compressed using the DEFLATE algorithm.
ptr: Starting address to decompress data
data: A list of bytes.
Note: An exception is thrown if it fails to decompress data.
"""
length = len(data)
_data = (ctypes.c_uint8 * length)(*data)
ret = _LIB.eve_click_inflate(ptr, _data, length)
if ret < 0:
raise Exception("eve click inflate failed")
def write(self, data):
"""Writes samples of the given SampleSpec to the PulseAudio server."""
self._fail_if_not_connected()
# TODO: Use self.create_data_array...
if self._sample_format == SampleFormat.PA_SAMPLE_U8:
size = len(data)
c_data = (ctypes.c_uint8 * size)(*data)
elif self._sample_format == SampleFormat.PA_SAMPLE_S16LE:
raise Exception('Not implemented yet.')
elif self._sample_format == SampleFormat.PA_SAMPLE_FLOAT32LE:
raise Exception('Not implemented yet.')
error = ctypes.c_int(0)
result = _libpulse_simple.pa_simple_write(self._client, c_data, size, error)
if result < 0:
raise PulseAudioException('Could not write data.', error.value)
def memalign(cty, align):
"""Allocate a ctype object on the specific byte alignment
"""
# Allocate bytes with offset
mem = (c_uint8 * (sizeof(cty) + align))()
addr = addressof(mem)
# Move to alignment
offset = addr % align
if offset:
offset = align - offset
buf = cty.from_address(offset + addr)
assert 0 == addressof(buf) % align
return buf, mem
def callback_friend_request(self, callback, user_data):
"""
Set the callback for the `friend_request` event. Pass None to unset.
This event is triggered when a friend request is received.
:param callback: Python function. Should take pointer (c_void_p) to Tox object,
The Public Key (c_uint8 array) of the user who sent the friend request,
The message (c_char_p) they sent along with the request,
The size (c_size_t) of the message byte array,
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, POINTER(c_uint8), c_char_p, c_size_t, c_void_p)
self.friend_request_cb = c_callback(callback)
Tox.libtoxcore.tox_callback_friend_request(self._tox_pointer, self.friend_request_cb, c_void_p(user_data))
def __init__(self, initializer):
from ctypes import cast, POINTER, c_uint8
Array3D = PixelCopyTestWithArray.Array3D
super(Array3D, self).__init__((3, 5, 3),
format='B',
strides=(20, 4, 1))
self.content = cast(self.buf, POINTER(c_uint8))
for i, v in enumerate(initializer):
self.content[i] = v
def __init__(self, blob):
# self.data = ctypes.c_uint8 * int(len(blob))
self.data = ctypes.create_string_buffer(len(blob))
def fixed_length(cls, clskey, length):
class foo(cls):
key = clskey
_fields_ = [("data", ctypes.c_uint8*length)]
return foo
def get_buffer_as_uint8(self):
return self.get_buffer_as(ctypes.c_uint8)
def get_buffer_as_triplet(self):
return self.get_buffer_as(ctypes.c_uint8 * 3)
def validate_u8(val, p, key=None):
if isinstance(val, int) and ctypes.c_uint8(val).value == val:
return
raise_validation_error(ErrInvalidU8, val, p, key=key)
def struct(cls, ea, **sid):
"""Return the structure_t at address ``ea`` as a dict of ctypes.
If the structure ``sid`` is specified, then use that specific structure type.
"""
ea = interface.address.within(ea)
if any(n in sid for n in ('sid','struc','structure','id')):
res = sid['sid'] if 'sid' in sid else sid['struc'] if 'struc' in sid else sid['structure'] if 'structure' in sid else sid['id'] if 'id' in sid else None
sid = res.id if isinstance(res, structure.structure_t) else res
else:
sid = type.structure.id(ea)
st = structure.instance(sid, offset=ea)
typelookup = {
(int,-1) : ctypes.c_int8, (int,1) : ctypes.c_uint8,
(int,-2) : ctypes.c_int16, (int,2) : ctypes.c_uint16,
(int,-4) : ctypes.c_int32, (int,4) : ctypes.c_uint32,
(int,-8) : ctypes.c_int64, (int,8) : ctypes.c_uint64,
(float,4) : ctypes.c_float, (float,8) : ctypes.c_double,
}
res = {}
for m in st.members:
t, val = m.type, read(m.offset, m.size) or ''
try:
ct = typelookup[t]
except KeyError:
ty, sz = t if isinstance(t, __builtin__.tuple) else (m.type, 0)
if isinstance(t, __builtin__.list):
t = typelookup[tuple(ty)]
ct = t*sz
elif ty in (chr,str):
ct = ctypes.c_char*sz
else:
ct = None
finally:
res[m.name] = val if any(_ is None for _ in (ct,val)) else ctypes.cast(ctypes.pointer(ctypes.c_buffer(val)),ctypes.POINTER(ct)).contents
return res
def image_u8_get_array(img_ptr):
return ptr_to_array2d(c_uint8,
img_ptr.contents.buf.contents,
img_ptr.contents.height,
img_ptr.contents.stride)