def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
python类Array()的实例源码
def __init__(self, *args, **kwds):
super(Array, self).__init__(*args, **kwds)
try:
if self.flags & PAI_NOTSWAPPED:
ct = self._ctypes[self.typekind, self.itemsize]
elif c_int.__ctype_le__ is c_int:
ct = self._ctypes[self.typekind, self.itemsize].__ctype_be__
else:
ct = self._ctypes[self.typekind, self.itemsize].__ctype_le__
except KeyError:
ct = c_uint8 * self.itemsize
self._ctype = ct
self._ctype_p = POINTER(ct)
def test_indices(self):
a = self.a
self.assertEqual(a[0, 0], 0)
self.assertEqual(a[19, 0], 0)
self.assertEqual(a[0, 14], 0)
self.assertEqual(a[19, 14], 0)
self.assertEqual(a[5, 8], 0)
a[0, 0] = 12
a[5, 8] = 99
self.assertEqual(a[0, 0], 12)
self.assertEqual(a[5, 8], 99)
self.assertRaises(IndexError, a.__getitem__, (-1, 0))
self.assertRaises(IndexError, a.__getitem__, (0, -1))
self.assertRaises(IndexError, a.__getitem__, (20, 0))
self.assertRaises(IndexError, a.__getitem__, (0, 15))
self.assertRaises(ValueError, a.__getitem__, 0)
self.assertRaises(ValueError, a.__getitem__, (0, 0, 0))
a = Array((3,), 'i', 4)
a[1] = 333
self.assertEqual(a[1], 333)
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None, ctx=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
ctx = ctx or get_context()
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock, ctx)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock, ctx)
return SynchronizedArray(obj, lock, ctx)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock, ctx)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def __eq__(self, other):
if not isinstance(other, ctypes.Array):
print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other)))
return False
if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character
print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1))
return False
if not isinstance(other[0], bytes):
print("Unexpected type. Not a string. Received: {0}".format(type(other[0])))
return False
if other.value.decode("ascii") != self.expected_string_value:
print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode))
return False
return True
# Custom Type
def synchronized(obj, lock=None, ctx=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
ctx = ctx or get_context()
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock, ctx)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock, ctx)
return SynchronizedArray(obj, lock, ctx)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock, ctx)
#
# Functions for pickling/unpickling
#
def Array(typecode_or_type, size_or_initializer, **kwds):
'''
Return a synchronization wrapper for a RawArray
'''
lock = kwds.pop('lock', None)
if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
if lock in (True, None):
lock = RLock()
if not hasattr(lock, 'acquire'):
raise AttributeError("'%r' has no method 'acquire'" % lock)
return synchronized(obj, lock)
def reduce_ctype(obj):
assert_spawning(obj)
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)
def __init__(self, *args, **kwds):
unittest.TestCase.__init__(self, *args, **kwds)
self.a = Array((20, 15), 'i', 4)
def test_typekind(self):
a = Array((1,), 'i', 4)
self.assertTrue(a._ctype is c_int32)
self.assertTrue(a._ctype_p is POINTER(c_int32))
a = Array((1,), 'u', 4)
self.assertTrue(a._ctype is c_uint32)
self.assertTrue(a._ctype_p is POINTER(c_uint32))
a = Array((1,), 'f', 4) # float types unsupported: size system dependent
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes.Array))
self.assertEqual(sizeof(ct), 4)
def test_itemsize(self):
for size in [1, 2, 4, 8]:
a = Array((1,), 'i', size)
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes._SimpleCData))
self.assertEqual(sizeof(ct), size)
def test_byteswapped(self):
a = Array((1,), 'u', 4, flags=(PAI_ALIGNED | PAI_WRITEABLE))
ct = a._ctype
self.assertTrue(ct is not c_uint32)
if sys.byteorder == 'little':
self.assertTrue(ct is c_uint32.__ctype_be__)
else:
self.assertTrue(ct is c_uint32.__ctype_le__)
i = 0xa0b0c0d
n = c_uint32(i)
a[0] = i
self.assertEqual(a[0], i)
self.assertEqual(a._data[0:4],
cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
def Array(typecode_or_type, size_or_initializer, **kwds):
'''
Return a synchronization wrapper for a RawArray
'''
lock = kwds.pop('lock', None)
if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
if lock in (True, None):
lock = RLock()
if not hasattr(lock, 'acquire'):
raise AttributeError("'%r' has no method 'acquire'" % lock)
return synchronized(obj, lock)
def reduce_ctype(obj):
assert_spawning(obj)
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)
def native(type_, value):
if isinstance(value, type_):
return value
if sys.version_info < (3,) and type_ == int and isinstance(value, int_types):
return value
if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte:
return ctypes.string_at(ctypes.addressof(value), value._length_)
return type_(value.value)
def from_param(self, param):
typename = type(param).__name__
if hasattr(self, 'from_'+typename):
return getattr(self, 'from_'+typename)(param)
elif isinstance(param, ctypes.Array):
return param
else:
raise TypeError("Can't convert %s" % typename)
# Cast from array.array objects
def Array(typecode_or_type, size_or_initializer, **kwds):
'''
Return a synchronization wrapper for a RawArray
'''
lock = kwds.pop('lock', None)
if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
if lock in (True, None):
lock = RLock()
if not hasattr(lock, 'acquire'):
raise AttributeError("'%r' has no method 'acquire'" % lock)
return synchronized(obj, lock)
def reduce_ctype(obj):
assert_spawning(obj)
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)