def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
python类_SimpleCData()的实例源码
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None, ctx=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
ctx = ctx or get_context()
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock, ctx)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock, ctx)
return SynchronizedArray(obj, lock, ctx)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock, ctx)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None, ctx=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
ctx = ctx or get_context()
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock, ctx)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock, ctx)
return SynchronizedArray(obj, lock, ctx)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock, ctx)
#
# Functions for pickling/unpickling
#
def test_itemsize(self):
for size in [1, 2, 4, 8]:
a = Array((1,), 'i', size)
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes._SimpleCData))
self.assertEqual(sizeof(ct), size)
def _ioc_type_size(size):
if isinstance(size, type) and issubclass(size, ctypes._SimpleCData):
return ctypes.sizeof(size)
elif isinstance(size, int):
return size
else:
raise TypeError('Invalid type for size: {size_type}'.format(size_type=size.__class__.__name__))
def check_ctypes_datatype(datatype):
valid_datatypes = (
ctypes._SimpleCData,
ctypes.Union,
ctypes.BigEndianStructure,
ctypes.LittleEndianStructure,
ctypes.Structure,
)
for t in valid_datatypes:
if issubclass(datatype, t):
return
raise TypeError('datatype must be a ctypes data type, but was {}'.format(datatype.__name__))
def ioctl_fn_ptr_r(request, datatype, return_python=None):
""" Create a helper function for invoking a ioctl() read call.
This function creates a helper function for creating a ioctl() read function.
It will call the ioctl() function with a pointer to data, and return the contents
of the data after the call.
If the datatype is a integer type (int, long, etc), it will be returned as a python int or long.
:param request: The ioctl request to call.
:param datatype: The data type of the data returned by the ioctl() call.
:param return_python: Whether we should attempt to convert the return data to a Python value. Defaults to True for fundamental ctypes data types.
:return: A function for invoking the specified ioctl().
:Example:
::
import ctypes
import os
import ioctl
import ioctl.linux
RNDGETENTCNT = ioctl.linux.IOR('R', 0x00, ctypes.c_int)
rndgetentcnt = ioctl.ioctl_fn_ptr_r(RNDGETENTCNT, ctypes.c_int)
fd = os.open('/dev/random', os.O_RDONLY)
entropy_avail = rndgetentcnt(fd)
"""
check_request(request)
check_ctypes_datatype(datatype)
if return_python is not None and not isinstance(return_python, bool):
raise TypeError('return_python must be None or a boolean, but was {}'.format(return_python.__class__.__name__))
if return_python is None:
return_python = issubclass(datatype, ctypes._SimpleCData)
def fn(fd):
check_fd(fd)
value = datatype()
ioctl(fd, request, ctypes.byref(value))
if return_python:
return value.value
else:
return value
return fn
def ioctl_fn_ptr_wr(request, datatype, return_python=None):
""" Create a helper function for invoking a ioctl() read & write call.
This function creates a helper function for a ioctl() operation that both reads and writes data.
Typically the case is a ioctl() operation that receives a pointer to data, performs an operation and updates the data at the pointer.
E.g. if you have a ioctl() call like::
int timeout = 45;
ioctl(fd, WDIOF_SETTIMEOUT, &timeout);
printf("Actual timeout: %d\n", timeout);
:param request: The ioctl request to call.
:param datatype: The data type of the data to be passed to the ioctl() call.
:param return_python: Whether we should attempt to convert the return data to a Python value. Defaults to True for fundamental ctypes data types.
:return: A function for invoking the specified ioctl().
:Example:
::
import ctypes
import os
import ioctl
WDIOF_SETTIMEOUT = 0x0080
wdiof_settimeout = ioctl.ioctl_fn_ptr_wr(WDIOF_SETTIMEOUT, ctypes.c_int)
fd = os.open('/dev/watchdog', os.O_RDONLY)
actual_timeout = wdiof_settimeout(fd, 60)
"""
check_request(request)
check_ctypes_datatype(datatype)
if return_python is not None and not isinstance(return_python, bool):
raise TypeError('return_python must be None or a boolean, but was {}'.format(return_python.__class__.__name__))
if return_python is None:
return_python = issubclass(datatype, ctypes._SimpleCData)
def fn(fd, value):
check_fd(fd)
value = datatype(value)
ioctl(fd, request, ctypes.byref(value))
if return_python:
return value.value
else:
return value
return fn