def test_concurrent_access(self):
st = SharedTable({'cnt': 0})
def inc():
for _ in range(50):
with st.get_lock():
st['cnt'] += 1
time.sleep(random.randint(1, 5) / 10000)
threads = []
for _ in range(5): # numthreads
threads.append(Process(target=inc))
for t in threads:
t.start()
for t in threads:
t.join()
assert st['cnt'] == 250
评论列表
文章目录