def test_sharedctypes(self, lock=False):
x = Value('i', 7, lock=lock)
y = Value(c_double, 1.0/3.0, lock=lock)
foo = Value(_Foo, 3, 2, lock=lock)
arr = self.Array('d', list(range(10)), lock=lock)
string = self.Array('c', 20, lock=lock)
string.value = latin('hello')
p = self.Process(target=self._double, args=(x, y, foo, arr, string))
p.daemon = True
p.start()
p.join()
self.assertEqual(x.value, 14)
self.assertAlmostEqual(y.value, 2.0/3.0)
self.assertEqual(foo.x, 6)
self.assertAlmostEqual(foo.y, 4.0)
for i in range(10):
self.assertAlmostEqual(arr[i], i*2)
self.assertEqual(string.value, latin('hellohello'))
python类Array()的实例源码
def _create_cache_shared(xs, hparamss):
"""Create shared cache.
"""
cache_source = create_cache_source(xs, hparamss)
cache_shared = {}
# for k, v in cache_source.iteritems():
for k, v in six.iteritems(cache_source):
assert(v.dtype == np.float32 or v.dtype == np.float64 or
v.dtype == float)
n = len(v.reshape(-1))
shared_array_base = multiprocessing.Array(ctypes.c_double, n)
shape = v.shape
view = np.ctypeslib.as_array(shared_array_base.get_obj())
view = view.reshape(shape)
view[:] = v[:]
del view
cache_shared.update({k: (shared_array_base, shape)})
return cache_shared
def array(shape, dtype=_np.float64, autolock=False):
"""Factory method for shared memory arrays supporting all numpy dtypes."""
assert _NP_AVAILABLE, (
"To use the shared array object, numpy must be available!")
if not isinstance(dtype, _np.dtype):
dtype = _np.dtype(dtype)
# Not bothering to translate the numpy dtypes to ctype types directly,
# because they're only partially supported. Instead, create a byte ctypes
# array of the right size and use a view of the appropriate datatype.
shared_arr = _multiprocessing.Array(
'b', int(_np.prod(shape) * dtype.alignment), lock=autolock)
with _warnings.catch_warnings():
# For more information on why this is necessary, see
# https://www.reddit.com/r/Python/comments/j3qjb/parformatlabpool_replacement
_warnings.simplefilter('ignore', RuntimeWarning)
data = _np.ctypeslib.as_array(shared_arr).view(dtype).reshape(shape)
return data
def initSharedMemoryState(self):
ProjectBuildTask.initSharedMemoryState(self)
self._newestBox = multiprocessing.Array('c', 2048)
def predict(self, event=None):
try:
first = int(self.predict_first.text())
last = int(self.predict_last.text())
num_proc = int(self.num_proc.text())
except ValueError:
sys.stderr.write('Integers only\n')
return
if last < 0:
last = self.parent.num_frames
if self.get_and_convert(first).shape[0] != self.parent.converted.shape[1]:
sys.stderr.write('Wrong length for converted image (expected %d, got %d). You may need to update converter.\n' %
(self.parent.converted.shape[1], self.get_and_convert(first).shape[0]))
return
predictions = multiprocessing.Array(ctypes.c_char, self.parent.num_frames)
jobs = []
for i in range(num_proc):
p = multiprocessing.Process(target=self.predict_worker, args=(i, num_proc, np.arange(first, last, dtype='i4'), predictions))
jobs.append(p)
p.start()
for j in jobs:
j.join()
sys.stderr.write('\r%d/%d\n' % (last, last))
self.predictions = np.frombuffer(predictions.get_obj(), dtype='S1')
self.gen_predict_summary()
def convert_frames(self, event=None):
try:
start = int(self.first_frame.text())
end = int(self.last_frame.text())
num_proc = int(self.num_proc.text())
except ValueError:
sys.stderr.write('Integers only for frame range and number of processors\n')
return
self.indices = np.arange(start, end, dtype='i4')
clist = self.parent.classes.clist[start:end]
if self.class_chars.text() != '':
sel = np.array([clist==c for c in self.class_chars.text()]).any(axis=0)
self.indices = self.indices[sel]
if len(self.indices) == 0:
sys.stderr.write('No frames of class %s in frame range\n'%self.class_chars.text())
return
else:
sys.stderr.write('Converting %d frames with %d processors\n' % (len(self.indices), num_proc))
arr = self.get_and_convert(0)
converted = multiprocessing.Array(ctypes.c_double, arr.size*len(self.indices))
jobs = []
for i in range(num_proc):
p = multiprocessing.Process(target=self.convert_worker, args=(i, num_proc, self.indices, arr.size, converted))
jobs.append(p)
p.start()
for j in jobs:
j.join()
sys.stderr.write('\r%d/%d\n' % (len(self.indices), len(self.indices)))
self.parent.converted = np.frombuffer(converted.get_obj()).reshape(len(self.indices), -1)
if self.save_flag.isChecked():
sys.stderr.write('Saving angular correlations to %s\n'%self.save_fname.text())
np.save(self.save_fname.text(), self.parent.converted)
def class_powder(self, event=None):
cnum = self.class_num.checkedId() - 1
if cnum == self.old_cnum:
powder = self.class_powder
elif cnum == -1:
powder = self.emc_reader.get_powder()
self.class_powder = powder
self.old_cnum = cnum
else:
points = np.where(self.classes.key_pos == cnum)[0]
num_proc = int(self.num_proc.text())
powders = multiprocessing.Array(ctypes.c_double, num_proc*self.parent.geom.mask.size)
pshape = (num_proc,) + self.parent.geom.mask.shape
print 'Calculating powder sum for class %s using %d threads' % (self.class_num.checkedButton().text(), num_proc)
jobs = []
for i in range(num_proc):
p = multiprocessing.Process(target=self.powder_worker, args=(i, points[i::num_proc], pshape, powders))
jobs.append(p)
p.start()
for j in jobs:
j.join()
sys.stderr.write('\r%d/%d\n'%(len(points), len(points)))
powder = np.frombuffer(powders.get_obj()).reshape(pshape).sum(0)
self.class_powder = powder
self.old_cnum = cnum
self.plot_frame(frame=powder)
def skip_test_multiprocessing():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True
def __init__(self, data_list, leafsize=30):
data = np.array(data_list)
n, m = data.shape
self.shmem_data = mp.Array(ctypes.c_double, n * m)
_data = shmem_as_nparray(self.shmem_data).reshape((n, m))
_data[:, :] = data
self._leafsize = leafsize
super(cKDTree_MP, self).__init__(_data, leafsize=leafsize)
def pquery(self, x_list, k=1, eps=0, p=2,
distance_upper_bound=np.inf):
x = np.array(x_list)
nx, mx = x.shape
shmem_x = mp.Array(ctypes.c_double, nx * mx)
shmem_d = mp.Array(ctypes.c_double, nx * k)
shmem_i = mp.Array(ctypes.c_double, nx * k)
_x = shmem_as_nparray(shmem_x).reshape((nx, mx))
_d = shmem_as_nparray(shmem_d).reshape((nx, k))
_i = shmem_as_nparray(shmem_i)
if k != 1:
_i = _i.reshape((nx, k))
_x[:, :] = x
nprocs = num_cpus()
scheduler = Scheduler(nx, nprocs)
ierr = mp.Value(ctypes.c_int, 0)
query_args = (scheduler,
self.shmem_data, self.n, self.m, self.leafsize,
shmem_x, nx, shmem_d, shmem_i,
k, eps, p, distance_upper_bound,
ierr
)
pool = [mp.Process(target=_pquery, args=query_args) for n in
range(nprocs)]
for p in pool: p.start()
for p in pool: p.join()
if ierr.value != 0:
raise RuntimeError('%d errors in worker processes' % (ierr.value))
return _d.copy(), _i.astype(int).copy()
def test_server_extra_proc(set_timeout, restore_signal):
extras = mp.Array('i', [0, 0])
def extra_proc(key, _, pidx, args):
assert _ is None
extras[key] = 980 + key
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print(f'extra[{key}] interrupted', file=sys.stderr)
except Exception as e:
print(f'extra[{key}] exception', e, file=sys.stderr)
finally:
print(f'extra[{key}] finish', file=sys.stderr)
extras[key] = 990 + key
@aiotools.actxmgr
async def myworker(loop, pidx, args):
yield
def interrupt():
os.kill(0, signal.SIGINT)
set_timeout(0.2, interrupt)
aiotools.start_server(myworker, extra_procs=[
functools.partial(extra_proc, 0),
functools.partial(extra_proc, 1)],
num_workers=3, args=(123, ))
assert extras[0] == 990
assert extras[1] == 991
def find_nearest_instances(training_data_instances, training_data_labels, test_data_instances, test_data_labels):
start_time = time.time()
# speed using multiple processes
NUMBER_OF_PROCESSES = 4
processes = []
# shared by different processes, to be mentioned is that
# global variable is only read within processes
# the update of global variable within a process will not be submitted
classified_results = multiprocessing.Array('i', len(test_data_instances), lock = False)
test_data_subdivisions = range(0, len(test_data_instances) + 1,\
int(len(test_data_instances) / NUMBER_OF_PROCESSES))
test_data_subdivisions[-1] = len(test_data_instances)
for process_index in range(NUMBER_OF_PROCESSES):
process = multiprocessing.Process(target = find_nearest_instances_subprocess,
args = (training_data_instances,
training_data_labels,
test_data_instances,
test_data_subdivisions[process_index],
test_data_subdivisions[process_index + 1],
classified_results))
process.start()
processes.append(process)
print "Waiting..."
# wait until all processes are finished
for process in processes:
process.join()
print "Complete."
print "--- %s seconds ---" % (time.time() - start_time)
error_count = 0
confusion_matrix = np.zeros((10, 10), dtype=np.int)
for test_instance_index, classified_label in zip(range(len(test_data_instances)),\
classified_results):
if test_data_labels[test_instance_index] != classified_label:
error_count += 1
confusion_matrix[test_data_labels[test_instance_index]][classified_label] += 1
error_rate = 100.0 * error_count / len(test_data_instances)
return classified_results, error_rate, confusion_matrix
def __init__(self, mgr, sampRate=128,
chans=[str(n)+'x' for n in np.power(2, np.arange(8))/2.0],
waveform='sinusoid', freq=1.0, mix='none', pollSize=2):
"""
Construct a new wave generator source.
Args:
sampRate: Floating point value of the initial sampling frequency.
chans: Tuple of strings containing the initial channel
configuration.
waveform: String describing the type of waveform to produce.
May be 'sinusoid' or 'sawtooth' or 'square'
freq: Base frequency. Each channel is a power-of-two
multiple of this frequency.
pollSize: Number of data samples collected during each poll.
Higher values result in better timing and marker
resolution but more CPU usage while higher values
typically use less CPU but worse timing results.
"""
self.waveform = mp.Value('I', 0)
self.freq = mp.Value('d', freq)
self.t0 = mp.Value('d', 0.0)
self.t0.value = 0.0
self.pollSize = pollSize
self.lock = mp.Lock()
Source.__init__(self, mgr=mgr, sampRate=sampRate, chans=chans,
configPanelClass=WaveGenConfigPanel)
self.setWaveform(waveform)
self.mixArr = mp.Array('d', self.getNChan()*self.getNChan())
self.mixMat = (np.frombuffer(self.mixArr.get_obj())
.reshape((-1,self.getNChan())))
self.setMix(mix)
def initWalk(self):
self.walk0Array = mp.Array('d', self.getNChan())
self.walk0 = np.frombuffer(self.walk0Array.get_obj())
self.walk0[:] = 0.0 # set start of random walk to zero
ilsvrc_cls_multithread_scipy.py 文件源码
项目:tensorflow_yolo2
作者: wenxichen
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def prepare_multithread(self):
"""Preperation for mutithread processing."""
self.reset = False
# num_batch_left should always be -1 until the last batch block of the epoch
self.num_batch_left = -1
self.num_child = 10
self.child_processes = [None] * self.num_child
self.batch_cursor_read = 0
self.batch_cursor_fetched = 0
# TODO: add this to cfg file
self.prefetch_size = 5 # in terms of batch
# TODO: may not need readed_batch after validating everything
self.read_batch_array_size = self.total_batch + self.prefetch_size * self.batch_size
self.readed_batch = Array('i', self.read_batch_array_size)
for i in range(self.read_batch_array_size):
self.readed_batch[i] = 0
self.prefetched_images = np.zeros((self.batch_size * self.prefetch_size
* self.num_child,
self.image_size, self.image_size, 3))
self.prefetched_labels = np.zeros(
(self.batch_size * self.prefetch_size * self.num_child))
self.queue_in = []
self.queue_out = []
for i in range(self.num_child):
self.queue_in.append(Queue())
self.queue_out.append(Queue())
self.start_process(i)
self.start_prefetch(i)
# fetch the first one
desc = 'receive the first half: ' + \
str(self.num_child * self.prefetch_size / 2) + ' batches'
for i in trange(self.num_child / 2, desc=desc):
# print "collecting", i
self.collect_prefetch(i)
def prepare_multithread(self):
"""Preperation for mutithread processing."""
self.reset = False
# num_batch_left should always be -1 until the last batch block of the epoch
self.num_batch_left = -1
self.num_child = 10
self.child_processes = [None] * self.num_child
self.batch_cursor_read = 0
self.batch_cursor_fetched = 0
# TODO: add this to cfg file
self.prefetch_size = 5 # in terms of batch
# TODO: may not need readed_batch after validating everything
self.read_batch_array_size = self.total_batch + self.prefetch_size * self.batch_size
self.readed_batch = Array('i', self.read_batch_array_size)
for i in range(self.read_batch_array_size):
self.readed_batch[i] = 0
self.prefetched_images = np.zeros((self.batch_size * self.prefetch_size
* self.num_child,
self.image_size, self.image_size, 3))
self.prefetched_labels = np.zeros(
(self.batch_size * self.prefetch_size * self.num_child))
self.queue_in = []
self.queue_out = []
for i in range(self.num_child):
self.queue_in.append(Queue())
self.queue_out.append(Queue())
self.start_process(i)
self.start_prefetch(i)
# fetch the first one
desc = 'receive the first half: ' + \
str(self.num_child * self.prefetch_size / 2) + ' batches'
for i in trange(self.num_child / 2, desc=desc):
# print "collecting", i
self.collect_prefetch(i)
def __init__(self, config):
self._value = multiprocessing.Array(ctypes.c_char, 8)
super(Context, self).__init__(config)
def conv_single_image(image):
shared_array_base = Array(ctypes.c_double, image.size)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(image.shape)
shared_array[:] = image
return shared_array
def _dense_distance_dual(lock, list1, list2, global_idx, shared_arr, dist_function):
"""Parallelize a general computation of a distance matrix.
Parameters
----------
lock : multiprocessing.synchronize.Lock
Value returned from multiprocessing.Lock().
input_list : list
List of values to compare to input_list[idx] (from 'idx' on).
shared_arr : array_like
Numpy array created as a shared object. Iteratively updated with the
result.
Example:
shared_array = np.frombuffer(mp.Array('d', n*n).get_obj()).reshape((n,n))
Returns
-------
"""
list_len = len(list1)
# PID = os.getpid()
# print("PID {} takes index {}".format(PID, index_i))
while global_idx.value < list_len:
with lock:
if not global_idx.value < list_len: return
idx = global_idx.value
global_idx.value += 1
# if idx % 100 == 0: progressbar(idx, list_len)
elem_1 = list1[idx]
for idx_j in range(len(list2)):
shared_arr[idx, idx_j] = dist_function(elem_1, list2[idx_j])
def dense_dm_dual(list1, list2, dist_function, condensed=False):
"""Compute in a parallel way a distance matrix for a 1-d array.
Parameters
----------
input_array : array_like
1-dimensional array for which to compute the distance matrix.
dist_function : function
Function to use for the distance computation.
Returns
-------
dist_matrix : array_like
Symmetric NxN distance matrix for each input_array element.
"""
n, m = len(list1), len(list2)
n_proc = min(mp.cpu_count(), n)
index = mp.Value('i', 0)
shared_array = np.frombuffer(mp.Array('d', n*m).get_obj()).reshape((n,m))
ps = []
lock = mp.Lock()
try:
for _ in range(n_proc):
p = mp.Process(target=_dense_distance_dual,
args=(lock, list1, list2, index, shared_array, dist_function))
p.start()
ps.append(p)
for p in ps:
p.join()
except (KeyboardInterrupt, SystemExit): _terminate(ps,'Exit signal received\n')
except Exception as e: _terminate(ps,'ERROR: %s\n' % e)
except: _terminate(ps,'ERROR: Exiting with unknown exception\n')
dist_matrix = shared_array.flatten() if condensed else shared_array
# progressbar(n,n)
return dist_matrix
def _dense_distance(lock, input_list, global_idx, shared_arr, dist_function):
"""Parallelize a general computation of a distance matrix.
Parameters
----------
lock : multiprocessing.synchronize.Lock
Value returned from multiprocessing.Lock().
input_list : list
List of values to compare to input_list[idx] (from 'idx' on).
shared_arr : array_like
Numpy array created as a shared object. Iteratively updated with the result.
Example:
shared_array = np.frombuffer(mp.Array('d', n*n).get_obj()).reshape((n,n))
Returns
-------
"""
list_len = len(input_list)
# PID = os.getpid()
# print("PID {} takes index {}".format(PID, index_i))
while global_idx.value < list_len:
with lock:
if not global_idx.value < list_len: return
idx = global_idx.value
global_idx.value += 1
if (idx) % 100 == 0: progressbar(idx, list_len)
elem_1 = input_list[idx]
for idx_j in range(idx+1, list_len):
shared_arr[idx, idx_j] = dist_function(elem_1, input_list[idx_j])
def dense_dm(input_array, dist_function, condensed=False):
"""Compute in a parallel way a distance matrix for a 1-d array.
Parameters
----------
input_array : array_like
1-dimensional array for which to compute the distance matrix.
dist_function : function
Function to use for the distance computation.
Returns
-------
dist_matrix : array_like
Symmetric NxN distance matrix for each input_array element.
"""
n = len(input_array)
n_proc = min(mp.cpu_count(), n)
index = mp.Value('i', 0)
shared_array = np.frombuffer(mp.Array('d', n*n).get_obj()).reshape((n,n))
# np.savetxt("shared_array", shared_array, fmt="%.2f", delimiter=',')
ps = []
lock = mp.Lock()
try:
for _ in range(n_proc):
p = mp.Process(target=_dense_distance,
args=(lock, input_array, index, shared_array, dist_function))
p.start()
ps.append(p)
for p in ps:
p.join()
except (KeyboardInterrupt, SystemExit): _terminate(ps,'Exit signal received\n')
except Exception as e: _terminate(ps,'ERROR: %s\n' % e)
except: _terminate(ps,'ERROR: Exiting with unknown exception\n')
dist_matrix = shared_array + shared_array.T
if condensed: dist_matrix = scipy.spatial.distance.squareform(dist_matrix)
progressbar(n,n)
return dist_matrix
def _sparse_distance(lock, input_list, global_idx, rows, cols, data, dist_function):
"""Parallelize a general computation of a sparse distance matrix.
Parameters
----------
lock : multiprocessing.synchronize.Lock
Value returned from multiprocessing.Lock().
input_list : list
List of values to compare to input_list[idx] (from 'idx' on).
shared_arr : array_like
Numpy array created as a shared object. Iteratively updated with the result.
Example:
shared_array = np.frombuffer(mp.Array('d', n*n).get_obj()).reshape((n,n))
Returns
-------
"""
list_len = len(input_list)
# PID = os.getpid()
# print("PID {} takes index {}".format(PID, index_i))
while global_idx.value < list_len:
with lock:
if not global_idx.value < list_len: return
idx = global_idx.value
global_idx.value += 1
if (idx) % 100 == 0: progressbar(idx, list_len)
elem_1 = input_list[idx]
for idx_j in range(idx+1, list_len):
_res = dist_function(elem_1, input_list[idx_j])
if _res > 0:
i, j, d = idx, idx_j, list_len
c_idx = d*(d-1)/2 - (d-i)*(d-i-1)/2 + j - i - 1
data[c_idx] = _res
rows[c_idx] = i
cols[c_idx] = j
def _sparse_distance_opt(lock, input_list, global_idx, rows, cols, data, func):
"""Parallelize a general computation of a sparse distance matrix.
Parameters
----------
lock : multiprocessing.synchronize.Lock
Value returned from multiprocessing.Lock().
input_list : list
List of values to compare to input_list[idx] (from 'idx' on).
shared_arr : array_like
Numpy array created as a shared object. Iteratively updated with the
result.
Example:
shared_array = np.frombuffer(mp.Array('d', n*n).get_obj()).reshape((n,n))
Returns
-------
"""
list_len = input_list.shape[0]
# PID = os.getpid()
# print("PID {} takes index {}".format(PID, index_i))
while global_idx.value < list_len:
with lock:
if not global_idx.value < list_len: return
idx = global_idx.value
global_idx.value += 1
if (idx) % 100 == 0: progressbar(idx, list_len)
for i in range(idx, list_len-1):
_res = func(input_list[i], input_list[i + 1])
if _res > 0:
j, d = i+1, list_len
c_idx = d*(d-1)/2 - (d-i)*(d-i-1)/2 + j - i - 1
data[c_idx] = _res
rows[c_idx] = i
cols[c_idx] = j
def SetRhoUpdateFunc(Func=None):
global rho_update_func
rho_update_func = Func if Func else __default_rho_update_func
# Tuple of indices to identify the information package for each node. Actual
# length of specific package (list) may vary depending on node degree.
# X_NID: Node ID
# X_OBJ: CVXPY Objective
# X_VARS: CVXPY Variables (entry from node_variables structure)
# X_CON: CVXPY Constraints
# X_IND: Starting index into shared node_vals Array
# X_LEN: Total length (sum of dimensions) of all variables
# X_DEG: Number of neighbors
# X_NEIGHBORS: Placeholder for information about each neighbors
# Information for each neighbor is two entries, appended in order.
# Starting index of the corresponding z-value in edge_z_vals. Then for u.
def getValue(arr, index, length):
return numpy.array(arr[index:(index + length)])
# Write value of numpy array nparr (with given length) to a shared Array at
# the given starting index.
def writeValue(sharedarr, index, nparr, length):
if length == 1:
nparr = [nparr]
sharedarr[index:(index + length)] = nparr
# Write the values for all of the Variables involved in a given Objective to
# the given shared Array.
# variables should be an entry from the node_values structure.
def once_per_command(func=None, block=False, default=None):
if func is None:
return functools.partial(once_per_command, block=block, default=default)
@functools.wraps(func)
def _func(*args, **kwargs):
lock = last_hash.get_lock()
if lock.acquire(block):
try:
command = fab.env.command or ''
infrastructure = fab.env.infrastructure or ''
current_session = hashlib.md5()
current_session.update(command.encode('utf-16be'))
current_session.update(infrastructure.encode('utf-16be'))
for host in fab.env.all_hosts:
current_session.update(host.encode('utf-16be'))
current_hash = current_session.digest()
if current_hash != last_hash.raw:
last_hash.raw = current_hash
return func(*args, **kwargs)
return default
finally:
lock.release()
last_hash = multiprocessing.Array(ctypes.c_char, hashlib.md5().digest_size)
return _func
def test_array(self, raw=False):
seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
if raw:
arr = self.RawArray('i', seq)
else:
arr = self.Array('i', seq)
self.assertEqual(len(arr), len(seq))
self.assertEqual(arr[3], seq[3])
self.assertEqual(list(arr[2:7]), list(seq[2:7]))
arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
self.assertEqual(list(arr[:]), seq)
self.f(seq)
p = self.Process(target=self.f, args=(arr,))
p.daemon = True
p.start()
p.join()
self.assertEqual(list(arr[:]), seq)
def test_array_from_size(self):
size = 10
# Test for zeroing (see issue #11675).
# The repetition below strengthens the test by increasing the chances
# of previously allocated non-zero memory being used for the new array
# on the 2nd and 3rd loops.
for _ in range(3):
arr = self.Array('i', size)
self.assertEqual(len(arr), size)
self.assertEqual(list(arr), [0] * size)
arr[:] = range(10)
self.assertEqual(list(arr), list(range(10)))
del arr