def test_default_timeout(self):
"""
Test the barrier's default timeout
"""
# create a barrier with a low default timeout
barrier = self.barriertype(self.N, timeout=0.3)
def f():
i = barrier.wait()
if i == self.N // 2:
# One thread is later than the default timeout of 0.3s.
time.sleep(1.0)
self.assertRaises(threading.BrokenBarrierError, barrier.wait)
self.run_threads(f)
python类BrokenBarrierError()的实例源码
def _test_timeout_f(cls, barrier, results):
i = barrier.wait()
if i == cls.N//2:
# One thread is late!
time.sleep(1.0)
try:
barrier.wait(0.5)
except threading.BrokenBarrierError:
results.append(True)
def _test_default_timeout_f(cls, barrier, results):
i = barrier.wait(cls.defaultTimeout)
if i == cls.N//2:
# One thread is later than the default timeout
time.sleep(1.0)
try:
barrier.wait()
except threading.BrokenBarrierError:
results.append(True)
def close():
if state.forked and not state.closed:
sync.quit.value = True
try:
sync.barrier_in.wait(1)
except BrokenBarrierError:
pass
state.closed = True
for p in processes: p.join()
def worker_error_close():
sync.workers_OK.value = False
try:
sync.barrier_out.wait(1)
except BrokenBarrierError:
pass
def do_extendable(self):
'''We need to leave threads ready in case the array is extended. Otherwise we can quit right after do() has completed. Also we can use this farm several times, the objects remain live, so we can extend and invoke another run() to gather the results as many times as we want.'''
o=self.init()
if not o.f: o.f=self # Set to the current farm
with self.cond: self.objects.append(o)
while True:
self.cond.acquire()
self.waiting+=1
lg.debug('waiting incremented: {}, len={}'.
format(self.waiting,len(self.arr)))
if not len(self.arr):
if self.waiting == self.num_threads:
# No threads left to replenish the array, we should all quit
# Adding poison pills
if 1:
lg.info('Killing all')
# Put poison pills for everyone including us
self.arr+=[None]*(self.num_threads)
self.cond.notify(self.num_threads) # Wake up other threads
else:
lg.info('Killing all')
self.arr+=[None]*(self.num_threads-1)
self.cond.notify(self.num_threads-1)
self.cond.release()
break
else:
self.cond.wait() # Someone else will kill us
lm=len(self.arr)
if lm: # Another check for those who have left cond.wait()
i=self.arr.pop()
self.waiting-=1
lg.debug('waiting decremented: '+str(self.waiting))
self.cond.release()
if not lm: continue # Someone has stolen our item, snap!
if i is None:
self.q.put(None) # Mark we're done
# Sleep on the condition to let other threads get their pills
lg.debug('Sleeping on barrier')
try:
self.barr.wait()
except BrokenBarrierError: # We're to quit
break
lg.debug('Continuing after barrier')
continue # then restart processing the queue
for j in farm.handle_item(o, i): self.q.put(j)
with self.cond: self.objects.remove(o)
del o
lg.info("has finished")