def Write(self, packet):
"""See base class."""
report_id = 0
out_report_buffer = (ctypes.c_uint8 * self.internal_max_out_report_len)()
out_report_buffer[:] = packet[:]
result = iokit.IOHIDDeviceSetReport(self.device_handle,
K_IO_HID_REPORT_TYPE_OUTPUT,
report_id,
out_report_buffer,
self.internal_max_out_report_len)
# Non-zero status indicates failure
if result != K_IO_RETURN_SUCCESS:
raise errors.OsHidError('Failed to write report to device')
python类c_uint8()的实例源码
def __init__(self, packet):
self.timestamp = timestamp_from_avbin(packet.timestamp)
self.data = (ctypes.c_uint8 * packet.size)()
self.size = packet.size
ctypes.memmove(self.data, packet.data, self.size)
# Decoded image. 0 == not decoded yet; None == Error or discarded
self.image = 0
self.id = self._next_id
self.__class__._next_id += 1
def _encode(cls, payload):
""" Protects a stream of data with escape characters
The payload should be a bytestring
"""
out = b''
for byte in payload:
if byte >= cls.ESCAPE:
out += bytearray([cls.ESCAPE])
byte = ctypes.c_uint8(byte - cls.ESCAPE).value
out += bytearray([byte])
return out
def __init__(self, packet):
self.timestamp = timestamp_from_avbin(packet.timestamp)
self.data = (ctypes.c_uint8 * packet.size)()
self.size = packet.size
ctypes.memmove(self.data, packet.data, self.size)
# Decoded image. 0 == not decoded yet; None == Error or discarded
self.image = 0
self.id = self._next_id
self.__class__._next_id += 1
def _authenticate(self, block, uid, key = "\xff\xff\xff\xff\xff\xff", use_b_key = False):
"""Authenticates to a particular block using a specified key"""
if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0:
raise Exception("Error setting Easy Framing property")
abttx = (ctypes.c_uint8 * 12)()
abttx[0] = self.MC_AUTH_A if not use_b_key else self.MC_AUTH_B
abttx[1] = block
for i in range(6):
abttx[i + 2] = ord(key[i])
for i in range(4):
abttx[i + 8] = ord(uid[i])
abtrx = (ctypes.c_uint8 * 250)()
return nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx),
ctypes.pointer(abtrx), len(abtrx), 0)
def _worker_init(mjb_bytes, worker_id, device_ids,
shared_rgbs, shared_depths, modder):
"""
Initializes the global state for the workers.
"""
s = RenderPoolStorage()
with worker_id.get_lock():
proc_worker_id = worker_id.value
worker_id.value += 1
s.device_id = device_ids[proc_worker_id % len(device_ids)]
s.shared_rgbs_array = np.frombuffer(
shared_rgbs.get_obj(), dtype=ctypes.c_uint8)
s.shared_depths_array = np.frombuffer(
shared_depths.get_obj(), dtype=ctypes.c_float)
# avoid a circular import
from mujoco_py import load_model_from_mjb, MjRenderContext, MjSim
s.sim = MjSim(load_model_from_mjb(mjb_bytes))
# attach a render context to the sim (needs to happen before
# modder is called, since it might need to upload textures
# to the GPU).
MjRenderContext(s.sim, device_id=s.device_id)
if modder is not None:
s.modder = modder(s.sim, random_state=proc_worker_id)
s.modder.whiten_materials()
else:
s.modder = None
global _render_pool_storage
_render_pool_storage = s
def nonlinear_function(byte, constant):
byte ^= constant
state = (byte + 1) % 256
byte ^= eight_bit_integer(state << 5).value
byte ^= eight_bit_integer(state >> 3).value
return byte
def nonlinear_function2(state_and_constant):
state, constant = state_and_constant
state ^= constant
state += 1
state ^= word(state >> 8).value
state ^= word(state << 8).value
state = word(~state).value
# state ^= word(state << 5).value
# state ^= word(state >> 3).value
return (eight_bit_integer(state).value, state)
def read_8(self, addr, buf, len, status):
addr = ctypes.c_uint32(addr)
data = ctypes.c_uint8()
status = ctypes.c_byte()
self.jlink.JLINKARM_ReadMemU8(addr, 1, ctypes.byref(data), ctypes.byref(status))
return data.value
def write_8(self, addr, data):
addr = ctypes.c_uint32(addr)
data = ctypes.c_uint8(data)
self.jlink.JLINKARM_WriteU8(addr, data)
def callback_file_recv_chunk(self, callback, user_data):
"""
Set the callback for the `file_recv_chunk` event. Pass NULL to unset.
This event is first triggered when a file transfer request is received, and subsequently when a chunk of file
data for an accepted request was received.
:param callback: Python function.
When length is 0, the transfer is finished and the client should release the resources it acquired for the
transfer. After a call with length = 0, the file number can be reused for new file transfers.
If position is equal to file_size (received in the file_receive callback) when the transfer finishes, the file
was received completely. Otherwise, if file_size was UINT64_MAX, streaming ended successfully when length is 0.
Should take pointer (c_void_p) to Tox object,
The friend number (c_uint32) of the friend who is sending the file.
The friend-specific file number (c_uint32) the data received is associated with.
The file position (c_uint64) of the first byte in data.
A byte array (c_char_p) containing the received chunk.
The length (c_size_t) of the received chunk.
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint32, c_uint64, POINTER(c_uint8), c_size_t, c_void_p)
self.file_recv_chunk_cb = c_callback(callback)
self.libtoxcore.tox_callback_file_recv_chunk(self._tox_pointer, self.file_recv_chunk_cb, user_data)
# -----------------------------------------------------------------------------------------------------------------
# Low-level custom packet sending and receiving
# -----------------------------------------------------------------------------------------------------------------
def callback_friend_lossy_packet(self, callback, user_data):
"""
Set the callback for the `friend_lossy_packet` event. Pass NULL to unset.
:param callback: Python function.
Should take pointer (c_void_p) to Tox object,
friend_number (c_uint32) - The friend number of the friend who sent a lossy packet,
A byte array (c_uint8 array) containing the received packet data,
length (c_size_t) - The length of the packet data byte array,
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, POINTER(c_uint8), c_size_t, c_void_p)
self.friend_lossy_packet_cb = c_callback(callback)
self.libtoxcore.tox_callback_friend_lossy_packet(self._tox_pointer, self.friend_lossy_packet_cb, user_data)
def audio_send_frame(self, friend_number, pcm, sample_count, channels, sampling_rate):
"""
Send an audio frame to a friend.
The expected format of the PCM data is: [s1c1][s1c2][...][s2c1][s2c2][...]...
Meaning: sample 1 for channel 1, sample 1 for channel 2, ...
For mono audio, this has no meaning, every sample is subsequent. For stereo, this means the expected format is
LRLRLR... with samples for left and right alternating.
:param friend_number: The friend number of the friend to which to send an audio frame.
:param pcm: An array of audio samples. The size of this array must be sample_count * channels.
:param sample_count: Number of samples in this frame. Valid numbers here are
((sample rate) * (audio length) / 1000), where audio length can be 2.5, 5, 10, 20, 40 or 60 milliseconds.
:param channels: Number of audio channels. Sulpported values are 1 and 2.
:param sampling_rate: Audio sampling rate used in this frame. Valid sampling rates are 8000, 12000, 16000,
24000, or 48000.
"""
toxav_err_send_frame = c_int()
result = ToxAV.libtoxav.toxav_audio_send_frame(self._toxav_pointer, c_uint32(friend_number),
cast(pcm, c_void_p),
c_size_t(sample_count), c_uint8(channels),
c_uint32(sampling_rate), byref(toxav_err_send_frame))
toxav_err_send_frame = toxav_err_send_frame.value
if toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['OK']:
return bool(result)
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['NULL']:
raise ArgumentError('The samples data pointer was NULL.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']:
raise RuntimeError('This client is currently not in a call with the friend.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['SYNC']:
raise RuntimeError('Synchronization error occurred.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['INVALID']:
raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too '
'large, or the audio sampling rate may be unsupported.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']:
raise RuntimeError('Either friend turned off audio or video receiving or we turned off sending for the said'
'payload.')
elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['RTP_FAILED']:
RuntimeError('Failed to push frame through rtp interface.')
def callback_video_receive_frame(self, callback, user_data):
"""
Set the callback for the `video_receive_frame` event. Pass None to unset.
:param callback: Python function.
The function type for the video_receive_frame callback.
Should take
toxAV pointer (c_void_p) to ToxAV object,
friend_number The friend number (c_uint32) of the friend who sent a video frame.
width Width (c_uint16) of the frame in pixels.
height Height (c_uint16) of the frame in pixels.
y
u
v Plane data (POINTER(c_uint8)).
The size of plane data is derived from width and height where
Y = MAX(width, abs(ystride)) * height,
U = MAX(width/2, abs(ustride)) * (height/2) and
V = MAX(width/2, abs(vstride)) * (height/2).
ystride
ustride
vstride Strides data (c_int32). Strides represent padding for each plane that may or may not be present. You must
handle strides in your image processing code. Strides are negative if the image is bottom-up
hence why you MUST abs() it when calculating plane buffer size.
user_data pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint16, c_uint16, POINTER(c_uint8), POINTER(c_uint8),
POINTER(c_uint8), c_int32, c_int32, c_int32, c_void_p)
self.video_receive_frame_cb = c_callback(callback)
ToxAV.libtoxav.toxav_callback_video_receive_frame(self._toxav_pointer, self.video_receive_frame_cb, user_data)
def setup(self):
# counter "ValueError: number of bits invalid for bit field"
monkeypatch_pyclibrary_ctypes_struct()
# register header- and library paths
# https://pyclibrary.readthedocs.org/en/latest/get_started/configuration.html#specifying-headers-and-libraries-locations
# TODO: this probably acts on a global basis; think about it
if self.include_path:
add_header_locations([self.include_path])
if self.library_path:
add_library_locations([self.library_path])
# define extra types suitable for embedded use
types = {
'uint8_t': c_uint8,
'uint16_t': c_uint16,
'uint32_t': c_uint32,
'int8_t': c_int8,
'int16_t': c_int16,
'int32_t': c_int32,
}
# TODO: this probably acts on a global basis; think about it
if not (CParser._init or CLibrary._init):
auto_init(extra_types=types)
def __init__(self, name='infrared', width=640, height=480, fps=30):
self.native = True
self.stream = rs_stream.RS_STREAM_INFRARED
self.format = rs_format.RS_FORMAT_Y8
self.shape = (height, width)
self.dtype = ctypes.c_uint8
super(InfraredStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def transfer(tx_data):
"""Transfers data using the current SPI bus. Returns a list of bytes.
tx_data: A list of bytes to send.
Note: An exception is thrown if an error occurs during the transfer.
"""
length = len(tx_data)
tx_buffer = (ctypes.c_uint8 * length)(*tx_data)
rx_buffer = (ctypes.c_uint8 * length)()
ret = _LIB.spi_transfer(tx_buffer, rx_buffer, length)
if ret < 0:
raise Exception("spi transfer failed")
return [rx_buffer[i] for i in range(length)]
def get_mode(index):
"""Returns the mode of a LED.
The LED must be initialised before calling this function.
index: must be an integer in range 0..7
Note: An exception is thrown if it fails to retrieve the mode of a LED.
"""
mode = ctypes.c_uint8(0)
ret = _LIB.led_get_mode(index, ctypes.byref(mode))
if ret < 0:
raise Exception("led get mode failed")
return mode.value
def get_pin(mikrobus_index, pin_type):
"""Returns GPIO index of a pin on provided mikrobus
Note: An exception is thrown if the gpio cannot be found.
"""
pin = ctypes.c_uint8(0)
ret = _LIB.gpio_get_pin(mikrobus_index, pin_type, ctypes.byref(pin))
if ret < 0:
raise Exception("gpio get pin failed")
return pin.value
def get_type(gpio_pin):
"""Returns the type of the GPIO
Some GPIO's on the Mikrobus has some type (AN, PWM, INT, RST or CS). Other
GPIO's don't have a type.
Note: An exception is thrown if the type of the gpio cannot be found.
"""
pin_type = ctypes.c_uint8(0)
ret = _LIB.gpio_get_type(gpio_pin, ctypes.byref(pin_type))
if ret < 0:
raise Exception("gpio get type failed")
return pin_type.value