semaphore_test.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_semaphore_contention():
    g_mutex = eventlet.Semaphore()
    counts = [0, 0]

    def worker(no):
        while min(counts) < 200:
            with g_mutex:
                counts[no - 1] += 1
                eventlet.sleep(0.001)

    t1 = eventlet.spawn(worker, no=1)
    t2 = eventlet.spawn(worker, no=2)
    eventlet.sleep(0.5)
    t1.kill()
    t2.kill()

    assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号