def test_multi_process(self):
mutex = self.mutex
def lock_success(self):
r = redis.from_url(TestLock._REDIS_URL)
m = Mutex(self.redis, TestLock._KEY)
m.lock()
self.assertTrue(m.is_lock())
time.sleep(10)
m.unlock()
def lock_error(self):
r = redis.from_url(TestLock._REDIS_URL)
m = Mutex(self.redis, TestLock._KEY)
m.lock()
self.assertFalse(m.is_lock())
jobs = [
Process(target=lock_success, args=(self, )),
Process(target=lock_error, args=(self, ))
]
for i in jobs:
i.start()
for i in jobs:
i.join()
评论列表
文章目录