wakeup-fd-racer.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:trio 作者: python-trio 项目源码 文件源码
def main():
    writer, reader = socket.socketpair()
    writer.setblocking(False)
    reader.setblocking(False)

    signal.set_wakeup_fd(writer.fileno())

    # Keep trying until we lose the race...
    for attempt in itertools.count():
        print(f"Attempt {attempt}: start")

        # Make sure the socket is empty
        drained = drain(reader)
        if drained:
            print(f"Attempt {attempt}: ({drained} residual bytes discarded)")

        # Arrange for SIGINT to be delivered 1 second from now
        thread = threading.Thread(target=raise_SIGINT_soon)
        thread.start()

        # Fake an IO loop that's trying to sleep for 10 seconds (but will
        # hopefully get interrupted after just 1 second)
        start = time.monotonic()
        target = start + 10
        try:
            select_calls = 0
            drained = 0
            while True:
                now = time.monotonic()
                if now > target:
                    break
                select_calls += 1
                r, _, _ = select.select([reader], [], [], target - now)
                if r:
                    # In theory we should loop to fully drain the socket but
                    # honestly there's 1 byte in there at most and it'll be
                    # fine.
                    drained += drain(reader)
        except KeyboardInterrupt:
            pass
        else:
            print(f"Attempt {attempt}: no KeyboardInterrupt?!")

        # We expect a successful run to take 1 second, and a failed run to
        # take 10 seconds, so 2 seconds is a reasonable cutoff to distinguish
        # them.
        duration = time.monotonic() - start
        if duration < 2:
            print(f"Attempt {attempt}: OK, trying again "
                  f"(select_calls = {select_calls}, drained = {drained})")
        else:
            print(f"Attempt {attempt}: FAILED, took {duration} seconds")
            print(f"select_calls = {select_calls}, drained = {drained}")
            break

        thread.join()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号