def lock_and_call(func, lock_path):
"""Grab a lock for lock_path and call func.
:param callable func: object to call after acquiring the lock
:param str lock_path: path to file or directory to lock
"""
# Reload module to reset internal _LOCKS dictionary
reload_module(util)
# start child and wait for it to grab the lock
cv = multiprocessing.Condition()
cv.acquire()
child_args = (cv, lock_path,)
child = multiprocessing.Process(target=hold_lock, args=child_args)
child.start()
cv.wait()
# call func and terminate the child
func()
cv.notify()
cv.release()
child.join()
assert child.exitcode == 0
python类Condition()的实例源码
def hold_lock(cv, lock_path): # pragma: no cover
"""Acquire a file lock at lock_path and wait to release it.
:param multiprocessing.Condition cv: condition for synchronization
:param str lock_path: path to the file lock
"""
from certbot import lock
if os.path.isdir(lock_path):
my_lock = lock.lock_dir(lock_path)
else:
my_lock = lock.LockFile(lock_path)
cv.acquire()
cv.notify()
cv.wait()
my_lock.release()
def __init__(self):
'''Execute a function asynchronously in another thread.'''
# management of execution queue
res = multiprocessing.Lock()
self.queue = multiprocessing.Condition(res)
self.state = []
# results
self.result = Queue.Queue()
# thread management
self.ev_unpaused = multiprocessing.Event()
self.ev_terminating = multiprocessing.Event()
self.thread = threading.Thread(target=self.__run__, name="Thread-{:s}-{:x}".format(self.__class__.__name__, id(self)))
# FIXME: we can support multiple threads, but since this is
# being bound by a single lock due to my distrust for IDA
# and race-conditions...we only use one.
self.lock = multiprocessing.Lock()
return self.__start()
def __init__(self, j=os.cpu_count(), max_utilization=100):
"""
Initialize a parallel loop object.
:param j: The maximum number of parallel jobs.
:param max_utilization: The maximum CPU utilization. Above this no more new jobs
will be started.
"""
self._j = j
self._max_utilization = max_utilization
# This gets initialized to 0, may be set to 1 anytime, but must not be reset to 0 ever;
# thus, no locking is needed when accessing
self._break = multiprocessing.sharedctypes.Value('i', 0, lock=False)
self._lock = multiprocessing.Condition()
self._slots = multiprocessing.sharedctypes.Array('i', j, lock=False)
psutil.cpu_percent(None)
# Beware! this is running in a new process now. state is shared with fork,
# but only changes to shared objects will be visible in parent.
def __init__(self, num_processor, batch_size, phase,
batch_idx_init = 0, data_ids_init = train_ids, capacity = 10):
self.num_processor = num_processor
self.batch_size = batch_size
self.data_load_capacity = capacity
self.manager = Manager()
self.batch_lock = Lock()
self.mutex = Lock()
self.cv_full = Condition(self.mutex)
self.cv_empty = Condition(self.mutex)
self.data_load_queue = self.manager.list()
self.cur_batch = self.manager.list([batch_idx_init])
self.processors = []
if phase == 'train':
self.data_ids = self.manager.list(data_ids_init)
elif phase == 'test':
self.data_ids = self.manager.list(test_ids)
else:
raise ValueError('Could not set phase to %s' % phase)
def test_waitfor(self):
# based on test in test/lock_tests.py
cond = self.Condition()
state = self.Value('i', -1)
p = self.Process(target=self._test_waitfor_f, args=(cond, state))
p.daemon = True
p.start()
with cond:
result = cond.wait_for(lambda : state.value==0)
self.assertTrue(result)
self.assertEqual(state.value, 0)
for i in range(4):
time.sleep(0.01)
with cond:
state.value += 1
cond.notify()
p.join(5)
self.assertFalse(p.is_alive())
self.assertEqual(p.exitcode, 0)
def test_waitfor_timeout(self):
# based on test in test/lock_tests.py
cond = self.Condition()
state = self.Value('i', 0)
success = self.Value('i', False)
sem = self.Semaphore(0)
p = self.Process(target=self._test_waitfor_timeout_f,
args=(cond, state, success, sem))
p.daemon = True
p.start()
self.assertTrue(sem.acquire(timeout=10))
# Only increment 3 times, so state == 4 is never reached.
for i in range(3):
time.sleep(0.01)
with cond:
state.value += 1
cond.notify()
p.join(5)
self.assertTrue(success.value)
def test_wait_result(self):
if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
pid = os.getpid()
else:
pid = None
c = self.Condition()
with c:
self.assertFalse(c.wait(0))
self.assertFalse(c.wait(0.1))
p = self.Process(target=self._test_wait_result, args=(c, pid))
p.start()
self.assertTrue(c.wait(10))
if pid is not None:
self.assertRaises(KeyboardInterrupt, c.wait, 10)
p.join()
def __init__(self, maxsize=0):
'''initialize the queue'''
self.mutex = multiprocessing.Lock()
self.not_empty = multiprocessing.Condition(self.mutex)
self.not_full = multiprocessing.Condition(self.mutex)
self.maxsize = maxsize
self._tags = {} # list of refid's for each tag
self._queue = {} # the actual queue data
self._refcount = {} # how many tags refer to a given refid in the queue
self.id_generator = id_generator()
def __init__(self):
self.lock = multiprocessing.Lock()
self.readers_condition = multiprocessing.Condition(self.lock)
self.writer_condition = multiprocessing.Condition(self.lock)
self.readers = multiprocessing.RawValue(ctypes.c_uint, 0)
self.writer = multiprocessing.RawValue(ctypes.c_bool, False)
def __init__(self, base, surfix = '', freq = "daily", cacheline = 200, flushnow = 0):
self.base = base
self.surfix = surfix
self.freq = freq
pathtool.mkdir (base)
self.file = "%s/%s.log" % (self.base, self.surfix)
base_logger.__init__ (self, codecs.open (self.file, "a", "utf8"), cacheline, flushnow)
self.cv = multiprocessing.Condition (multiprocessing.RLock())
self.using = 0
self.numlog = 0
self.maintern ()
self.rotate_when = self.get_next_rotate (self.freq)
def process_condition(self):
if self._condition == None:
self._condition = multiprocessing.Condition()
return self._condition
def __init__(self, data_flow, nr=2, cache=2):
self._is_running = False
self._nr = nr
self._queue = multiprocessing.Queue(nr * cache)
self._condition = multiprocessing.Condition()
self._processes = [multiprocessing.Process(target=GatherMultiProcs.process_func,
args=(data_flow, self._queue, self._condition)) for _ in range(nr)]
for p in self._processes:
p.daemon = True
def __init__(self, target, num_workers, description=None):
# type: (_MultiprocessOffload, function, int, str) -> None
"""Ctor for Crypto Offload
:param _MultiprocessOffload self: this
:param function target: target function for process
:param int num_workers: number of worker processes
:param str description: description
"""
self._task_queue = multiprocessing.Queue()
self._done_queue = multiprocessing.Queue()
self._done_cv = multiprocessing.Condition()
self._term_signal = multiprocessing.Value('i', 0)
self._procs = []
self._check_thread = None
self._initialize_processes(target, num_workers, description)
def done_cv(self):
# type: (_MultiprocessOffload) -> multiprocessing.Condition
"""Get Done condition variable
:param _MultiprocessOffload self: this
:rtype: multiprocessing.Condition
:return: cv for download done
"""
return self._done_cv
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def __init__(self, * args):
TServer.__init__(self, *args)
self.numWorkers = 10
self.workers = []
self.isRunning = Value('b', False)
self.stopCondition = Condition()
self.postForkCallback = None
def test_timeout(self):
cond = self.Condition()
wait = TimingWrapper(cond.wait)
cond.acquire()
res = wait(TIMEOUT1)
cond.release()
self.assertEqual(res, False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
def __init__(self, n_procs):
"""
Create a barrier that waits for n_procs processes.
:param n_procs: The number of processes to wait for.
"""
self.n_procs = n_procs
self.count = multiprocessing.Value('i', 0, lock=False)
self.cvar = multiprocessing.Condition()
def __init__(self):
"""
Create an ordered barrier. When processes wait on this barrier, they are let through one at a time based
on the provided index. The first process to be let through should provide an index of zero. Each subsequent
process to be let through should provide an index equal to the current value of the internal counter.
"""
import multiprocessing.sharedctypes
self.cvar = multiprocessing.Condition()
self.sval = multiprocessing.sharedctypes.RawValue('L')
self.sval.value = 0
def __init__(self, size):
# The size of the queue is increased by one to give space for a QueueClosed signal.
size += 1
import multiprocessing.sharedctypes
# The condition variable is used to both lock access to the internal resources and signal new items are ready.
self.cvar = multiprocessing.Condition()
# A shared array is used to store items in the queue
sary = multiprocessing.sharedctypes.RawArray('b', 8*size)
self.vals = np.frombuffer(sary, dtype=np.int64, count=size)
self.vals[:] = -1
# tail is the next item to be read from the queue
self.tail = multiprocessing.sharedctypes.RawValue('l', 0)
# size is the current number of items in the queue. head = tail + size
self.size = multiprocessing.sharedctypes.RawValue('l', 0)
def __init__(self, ServerClass=None):
self.width, self.height = 50, 50
self.renderer = Renderer(self.width, self.height)
self.controller = Controller()
self.init()
if ServerClass is not None:
self.server = ServerClass()
self.server.http_server.counter = Counter()
self.server.http_server.condition_refresh = Condition()
self.server.http_server.current_screen = None
self.server.run()
def __init__(self, * args):
TServer.__init__(self, *args)
self.numWorkers = 10
self.workers = []
self.isRunning = Value('b', False)
self.stopCondition = Condition()
self.postForkCallback = None
def test_check_for_downloads_from_md5():
lpath = 'lpath'
rfile = azmodels.StorageEntity('cont')
rfile._md5 = 'abc'
rfile._client = mock.MagicMock()
rfile._client.primary_endpoint = 'ep'
rfile._name = 'name'
rfile._vio = None
rfile._size = 256
key = ops.Downloader.create_unique_transfer_operation_id(rfile)
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[key] = rfile
d._transfer_set.add(key)
d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
]
d._add_to_download_queue = mock.MagicMock()
d._all_remote_files_processed = False
d._download_terminate = True
d._check_for_downloads_from_md5()
assert d._add_to_download_queue.call_count == 0
with mock.patch(
'blobxfer.operations.download.Downloader.'
'termination_check_md5',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[key] = rfile
d._transfer_set.add(key)
d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
]
d._add_to_download_queue = mock.MagicMock()
patched_tc.side_effect = [False, False, True]
d._check_for_downloads_from_md5()
assert d._add_to_download_queue.call_count == 1
with mock.patch(
'blobxfer.operations.download.Downloader.'
'termination_check_md5',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[key] = rfile
d._transfer_set.add(key)
d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [None]
d._add_to_download_queue = mock.MagicMock()
patched_tc.side_effect = [False, True, True]
d._check_for_downloads_from_md5()
assert d._add_to_download_queue.call_count == 0
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
def test_notify(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
woken = self.Semaphore(0)
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
# wait for both children to start sleeping
sleeping.acquire()
sleeping.acquire()
# check no process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(0, get_value, woken)
# wake up one process/thread
cond.acquire()
cond.notify()
cond.release()
# check one process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(1, get_value, woken)
# wake up another
cond.acquire()
cond.notify()
cond.release()
# check other has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(2, get_value, woken)
# check state is not mucked up
self.check_invariant(cond)
p.join()