def testFromParamWithUInt16Array(self):
class UInt16ArrayArg():
def __init__(self, value):
self._ret= (ctypes.c_uint16 * len(value))()
for i in range(len(value)):
self._ret[i]= value[i]
def from_param(self):
return self._ret
def array(self):
return np.array([x for x in self._ret])
xsubi1= UInt16ArrayArg([1, 2, 4092])
self.assertTrue(np.allclose(np.array([1, 2, 4092]),
xsubi1.array()))
xsubi2= UInt16ArrayArg([1, 2, 4092])
self.libc.nrand48.argtypes= [UInt16ArrayArg]
ret1= self.libc.nrand48(xsubi1)
ret2= self.libc.nrand48(xsubi2)
self.assertEqual(ret1, ret2)
self.assertFalse(np.allclose(np.array([1, 2, 4092]),
xsubi1.array()))
python类c_uint16()的实例源码
def _return_ctype(self):
""" Returns the associated ctype of a given datatype. """
_datatype_ctype = {
DataType.Bool: ctypes.c_uint8,
DataType.I8: ctypes.c_int8,
DataType.U8: ctypes.c_uint8,
DataType.I16: ctypes.c_int16,
DataType.U16: ctypes.c_uint16,
DataType.I32: ctypes.c_int32,
DataType.U32: ctypes.c_uint32,
DataType.I64: ctypes.c_int64,
DataType.U64: ctypes.c_uint64,
DataType.Sgl: ctypes.c_float,
DataType.Dbl: ctypes.c_double,
}
return _datatype_ctype[self]
def add_tcp_relay(self, address, port, public_key):
"""
Adds additional host:port pair as TCP relay.
This function can be used to initiate TCP connections to different ports on the same bootstrap node, or to add
TCP relays without using them as bootstrap nodes.
:param address: The hostname or IP address (IPv4 or IPv6) of the TCP relay.
:param port: The port on the host on which the TCP relay is listening.
:param public_key: The long term public key of the TCP relay (TOX_PUBLIC_KEY_SIZE bytes).
:return: True on success.
"""
tox_err_bootstrap = c_int()
result = Tox.libtoxcore.tox_add_tcp_relay(self._tox_pointer, c_char_p(address), c_uint16(port),
string_to_bin(public_key), byref(tox_err_bootstrap))
tox_err_bootstrap = tox_err_bootstrap.value
if tox_err_bootstrap == TOX_ERR_BOOTSTRAP['OK']:
return bool(result)
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_HOST']:
raise ArgumentError('The address could not be resolved to an IP '
'address, or the IP address passed was invalid.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_PORT']:
raise ArgumentError('The port passed was invalid. The valid port range is (1, 65535).')
def __SetDLLReturnTypes(self):
self.nnotesdll.NotesInitExtended.restype = ctypes.c_uint16
self.nnotesdll.NotesTerm.restype = ctypes.c_uint16
self.nnotesdll.NSFDbOpen.restype = ctypes.c_uint16
self.nnotesdll.NSFDbClose.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteOpenExt.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteOpenByUNID.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteClose.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteCopy.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteGetInfo.restype = None
self.nnotesdll.NSFNoteIsSignedOrSealed.restype = ctypes.c_bool
self.nnotesdll.NSFNoteDecrypt.restype = ctypes.c_uint16
self.nnotesdll.NSFItemDelete.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteHasMIMEPart.restype = ctypes.c_bool
self.nnotesdll.NSFNoteHasMIME.restype = ctypes.c_bool
self.nnotesdll.NSFNoteHasComposite.restype = ctypes.c_bool
self.nnotesdll.MMCreateConvControls.restype = ctypes.c_uint16
self.nnotesdll.MMDestroyConvControls.restype = ctypes.c_uint16
self.nnotesdll.MMSetMessageContentEncoding.restype = None
self.nnotesdll.MIMEConvertCDParts.restype = ctypes.c_uint16
self.nnotesdll.MIMEConvertMIMEPartCC.restype = ctypes.c_uint16
self.nnotesdll.NSFNoteUpdate.restype = ctypes.c_uint16
def get_color():
"""Returns the color measurement as a tuple.
The Color Click measures the intensity of the light by component and also
the general intensity. The Color Click must be enabled before calling this
function.
Note: An exception is thrown if it fails to measure the color intensity.
"""
clear = ctypes.c_uint16(0)
red = ctypes.c_uint16(0)
green = ctypes.c_uint16(0)
blue = ctypes.c_uint16(0)
ret = _LIB.color_click_get_color(ctypes.byref(clear),
ctypes.byref(red),
ctypes.byref(green),
ctypes.byref(blue))
if ret < 0:
raise Exception("color click get color failed")
return (clear.value, red.value, green.value, blue.value)
def get_measure(mikrobus_index, use_spi):
"""Get a measure from Light Click.
mikrobus_index: must be 0 (MIKROBUS_1) or 1 (MIKROBUS_2)
use_spi: 1 if you use SPI (on board ADC) or 0 if you use ADC output
directly.
Note: An exception is thrown if it fails to get a measure from the Light
Click.
"""
measure = ctypes.c_uint16(0)
ret = _LIB.light_click_get_measure(mikrobus_index,
ctypes.byref(measure),
use_spi)
if ret < 0:
raise Exception("light click get measure failed")
return measure.value
def bootstrap(self, address, port, public_key):
"""
Sends a "get nodes" request to the given bootstrap node with IP, port, and public key to setup connections.
This function will attempt to connect to the node using UDP. You must use this function even if
Tox_Options.udp_enabled was set to false.
:param address: The hostname or IP address (IPv4 or IPv6) of the node.
:param port: The port on the host on which the bootstrap Tox instance is listening.
:param public_key: The long term public key of the bootstrap node (TOX_PUBLIC_KEY_SIZE bytes).
:return: True on success.
"""
tox_err_bootstrap = c_int()
result = Tox.libtoxcore.tox_bootstrap(self._tox_pointer, c_char_p(address), c_uint16(port),
string_to_bin(public_key), byref(tox_err_bootstrap))
tox_err_bootstrap = tox_err_bootstrap.value
if tox_err_bootstrap == TOX_ERR_BOOTSTRAP['OK']:
return bool(result)
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_HOST']:
raise ArgumentError('The address could not be resolved to an IP '
'address, or the IP address passed was invalid.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_PORT']:
raise ArgumentError('The port passed was invalid. The valid port range is (1, 65535).')
def add_tcp_relay(self, address, port, public_key):
"""
Adds additional host:port pair as TCP relay.
This function can be used to initiate TCP connections to different ports on the same bootstrap node, or to add
TCP relays without using them as bootstrap nodes.
:param address: The hostname or IP address (IPv4 or IPv6) of the TCP relay.
:param port: The port on the host on which the TCP relay is listening.
:param public_key: The long term public key of the TCP relay (TOX_PUBLIC_KEY_SIZE bytes).
:return: True on success.
"""
tox_err_bootstrap = c_int()
result = Tox.libtoxcore.tox_add_tcp_relay(self._tox_pointer, c_char_p(address), c_uint16(port),
c_char_p(public_key), byref(tox_err_bootstrap))
tox_err_bootstrap = tox_err_bootstrap.value
if tox_err_bootstrap == TOX_ERR_BOOTSTRAP['OK']:
return bool(result)
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_HOST']:
raise ArgumentError('The address could not be resolved to an IP '
'address, or the IP address passed was invalid.')
elif tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_PORT']:
raise ArgumentError('The port passed was invalid. The valid port range is (1, 65535).')
def testArgsUInt16Array(self):
xsubi1= (ctypes.c_uint16 * 3)(1, 2, 3)
xsubi2= (ctypes.c_uint16 * 3)(1, 2, 3)
ret1= self.libc.nrand48(xsubi1)
ret2= self.libc.nrand48(xsubi2)
self.assertEqual(ret1, ret2)
def get_buffer_as_uint16(self):
return self.get_buffer_as(ctypes.c_uint16)
def keyEvent(self, key, val):
"""
Generate a key or btn event
@param int axis key or btn event (KEY_* or BTN_*)
@param int val event value
"""
self._lib.uinput_key(self._fd,
ctypes.c_uint16(key),
ctypes.c_int32(val))
def axisEvent(self, axis, val):
"""
Generate a abs event (joystick/pad axes)
@param int axis abs event (ABS_*)
@param int val event value
"""
self._lib.uinput_abs(self._fd,
ctypes.c_uint16(axis),
ctypes.c_int32(val))
def relEvent(self, rel, val):
"""
Generate a rel event (move move)
@param int rel rel event (REL_*)
@param int val event value
"""
self._lib.uinput_rel(self._fd,
ctypes.c_uint16(rel),
ctypes.c_int32(val))
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None
def get_int_property(device_t, property):
""" Search the given device for the specified string property
@param device_t Device to search
@param property String to search for.
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman
)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_t,
key,
kCFAllocatorDefault,
0
);
number = ctypes.c_uint16()
if CFContainer:
output = cf.CFNumberGetValue(CFContainer, 2, ctypes.byref(number))
return number.value
def get_int_property(device_t, property):
""" Search the given device for the specified string property
@param device_t Device to search
@param property String to search for.
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman
)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_t,
key,
kCFAllocatorDefault,
0
);
number = ctypes.c_uint16()
if CFContainer:
output = cf.CFNumberGetValue(CFContainer, 2, ctypes.byref(number))
return number.value
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None
def get_int_property(device_t, property):
""" Search the given device for the specified string property
@param device_t Device to search
@param property String to search for.
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman
)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_t,
key,
kCFAllocatorDefault,
0
);
number = ctypes.c_uint16()
if CFContainer:
output = cf.CFNumberGetValue(CFContainer, 2, ctypes.byref(number))
return number.value
def test_intfield_with_variable_bit_lenth(self):
"""
This test verifies that setting an integer value of variable size is correctly exported to the to_bytes
function. This also tests the ability to set a value for the packet upon instantiation.
"""
class int_packet_with_varied_sized_int_fields(models.Packet):
int_field = models.IntField()
int_field_signed = models.IntField(signed=True)
int_field_4_bits = models.IntField(bit_len=4)
int_field_12_bits = models.IntField(bit_len=12)
pkt = int_packet_with_varied_sized_int_fields(
int_field = 0xbeef,
int_field_signed = 0xdead,
int_field_4_bits = 0xa,
int_field_12_bits = 0xbc
)
class c_pkt_struct(ctypes.Structure):
_fields_ = (
('int_field', ctypes.c_uint16),
('int_field_signed', ctypes.c_int16),
('int_field_4_bits', ctypes.c_uint16, 4),
('int_field_12_bits', ctypes.c_uint16, 12),
)
c_pkt = c_pkt_struct()
c_pkt.int_field = 0xbeef
c_pkt.int_field_signed = 0xdead
c_pkt.int_field_4_bits = 0xa
c_pkt.int_field_12_bits = 0xbc
b_str = ctypes.string_at(ctypes.addressof(c_pkt), ctypes.sizeof(c_pkt))
self.assertEquals(b_str, pkt.to_bytes())
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None
def validate_u16(val, p, key=None):
if isinstance(val, int) and ctypes.c_uint16(val).value == val:
return
raise_validation_error(ErrInvalidU16, val, p, key=key)
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None
def struct(cls, ea, **sid):
"""Return the structure_t at address ``ea`` as a dict of ctypes.
If the structure ``sid`` is specified, then use that specific structure type.
"""
ea = interface.address.within(ea)
if any(n in sid for n in ('sid','struc','structure','id')):
res = sid['sid'] if 'sid' in sid else sid['struc'] if 'struc' in sid else sid['structure'] if 'structure' in sid else sid['id'] if 'id' in sid else None
sid = res.id if isinstance(res, structure.structure_t) else res
else:
sid = type.structure.id(ea)
st = structure.instance(sid, offset=ea)
typelookup = {
(int,-1) : ctypes.c_int8, (int,1) : ctypes.c_uint8,
(int,-2) : ctypes.c_int16, (int,2) : ctypes.c_uint16,
(int,-4) : ctypes.c_int32, (int,4) : ctypes.c_uint32,
(int,-8) : ctypes.c_int64, (int,8) : ctypes.c_uint64,
(float,4) : ctypes.c_float, (float,8) : ctypes.c_double,
}
res = {}
for m in st.members:
t, val = m.type, read(m.offset, m.size) or ''
try:
ct = typelookup[t]
except KeyError:
ty, sz = t if isinstance(t, __builtin__.tuple) else (m.type, 0)
if isinstance(t, __builtin__.list):
t = typelookup[tuple(ty)]
ct = t*sz
elif ty in (chr,str):
ct = ctypes.c_char*sz
else:
ct = None
finally:
res[m.name] = val if any(_ is None for _ in (ct,val)) else ctypes.cast(ctypes.pointer(ctypes.c_buffer(val)),ctypes.POINTER(ct)).contents
return res
def get_int_property(device_t, property):
""" Search the given device for the specified string property
@param device_t Device to search
@param property String to search for.
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman
)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_t,
key,
kCFAllocatorDefault,
0
);
number = ctypes.c_uint16()
if CFContainer:
output = cf.CFNumberGetValue(CFContainer, 2, ctypes.byref(number))
return number.value
def write_char_with_color(char, fg_col, bg_col):
set_color(fg_col, bg_col)
if char == '\n':
fill_to_eol_with_bg_color() # insure bg_col covers rest of line
if is_windows and char != '\n':
cbuf = CONSOLE_SCREEN_BUFFER_INFO()
stdout_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-11))
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(stdout_handle, ctypes.byref(cbuf))
cursor = cbuf.dwCursorPosition
# we only write on the left for status, so not touching cursor is fine
written = ctypes.c_uint(0)
char_attr = ctypes.c_uint16(cbuf.wAttributes)
ctypes.windll.kernel32.WriteConsoleOutputAttribute(stdout_handle,
ctypes.byref(char_attr),
1,
cursor,
ctypes.byref(written))
ctypes.windll.kernel32.WriteConsoleOutputCharacterA(stdout_handle,
ctypes.c_char_p(char),
1,
cursor,
ctypes.byref(written))
if cursor.X < cbuf.srWindow.Right - 1:
cursor.X += 1
ctypes.windll.kernel32.SetConsoleCursorPosition(stdout_handle, cursor)
else:
sys.stdout.write(char)
def fill_to_eol_with_bg_color():
if is_windows:
cbuf = CONSOLE_SCREEN_BUFFER_INFO()
stdout_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-11))
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(stdout_handle, ctypes.byref(cbuf))
cursor = cbuf.dwCursorPosition
distance = cbuf.srWindow.Right - cursor.X
# distance > 0 skips x == right to avoid default windows scroll-on-last-col-write behavior
if distance > 0:
cbuf = CONSOLE_SCREEN_BUFFER_INFO()
stdout_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-11))
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(stdout_handle, ctypes.byref(cbuf))
cursor = cbuf.dwCursorPosition
temp_cursor = COORD()
written = ctypes.c_uint(0)
char_attr = ctypes.c_uint16(cbuf.wAttributes)
space = ctypes.c_char_p(' ')
for i in range(distance):
temp_cursor.X = cursor.X + i
temp_cursor.Y = cursor.Y
ctypes.windll.kernel32.WriteConsoleOutputAttribute(stdout_handle,
ctypes.byref(char_attr),
1,
temp_cursor,
ctypes.byref(written))
ctypes.windll.kernel32.WriteConsoleOutputCharacterA(stdout_handle,
space,
1,
temp_cursor,
ctypes.byref(written))
else:
sys.stdout.write('\x1b[K') # insure bg_col covers rest of line
def _attribute_factory(code, data):
class _Internal(NetlinkStructure):
_fields_ = (
('len', c_uint16),
('code', c_uint16),
('data', type(data)),
)
return _Internal(code=code, len=sizeof(_Internal), data=data)
def get_int_property(device_t, property):
""" Search the given device for the specified string property
@param device_t Device to search
@param property String to search for.
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman
)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_t,
key,
kCFAllocatorDefault,
0
);
number = ctypes.c_uint16()
if CFContainer:
output = cf.CFNumberGetValue(CFContainer, 2, ctypes.byref(number))
return number.value
def get_int_property(device_type, property, cf_number_type):
"""
Search the given device for the specified string property
@param device_type Device to search
@param property String to search for
@param cf_number_type CFType number
@return Python string containing the value, or None if not found.
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
key,
kCFAllocatorDefault,
0)
if CFContainer:
if (cf_number_type == kCFNumberSInt32Type):
number = ctypes.c_uint32()
elif (cf_number_type == kCFNumberSInt16Type):
number = ctypes.c_uint16()
cf.CFNumberGetValue(CFContainer, cf_number_type, ctypes.byref(number))
cf.CFRelease(CFContainer)
return number.value
return None