def test_filesystem_lock_and_mutex(self):
lock_dir = tempfile.mkdtemp()
lock_path = os.path.join(lock_dir, 'lock')
lock = FileSystemlockAndMutex(lock_path)
os.symlink(str(2**30), lock_path) # that's non-existend PID for sure
lock_count = 100
unlock_count = 0
dl = []
for i in range(lock_count):
dl.append(lock.acquire())
if random.choice([0, 1]) == 0:
unlock_count += 1
lock.release()
for i in range(lock_count - unlock_count):
lock.release()
yield defer.DeferredList(dl)
self.assertFalse(lock.locked)
shutil.rmtree(lock_dir)
评论列表
文章目录