test_run.py 文件源码

python
阅读 35 收藏 0 点赞 0 评论 0

项目:trio 作者: python-trio 项目源码 文件源码
def test_TrioToken_run_sync_soon_massive_queue():
    # There are edge cases in the wakeup fd code when the wakeup fd overflows,
    # so let's try to make that happen. This is also just a good stress test
    # in general. (With the current-as-of-2017-02-14 code using a socketpair
    # with minimal buffer, Linux takes 6 wakeups to fill the buffer and MacOS
    # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes
    # ~600,000 wakeups, but has the same code paths...)
    COUNT = 1000
    token = _core.current_trio_token()
    counter = [0]

    def cb(i):
        # This also tests FIFO ordering of callbacks
        assert counter[0] == i
        counter[0] += 1

    for i in range(COUNT):
        token.run_sync_soon(cb, i)
    await wait_all_tasks_blocked()
    assert counter[0] == COUNT
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号