daemon.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:creator-system-test-framework 作者: CreatorDev 项目源码 文件源码
def waitForIPC(ipcPort, timeout, request):
    """Timeout is in seconds."""
    #time.sleep(2)

    # assume 127.0.0.1 for now
    address = "127.0.0.1"
    port = int(ipcPort)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # overall timeout in microseconds
    timeout_us = timeout * 1000000

    # set socket timeout (10 ms per attempt)
    sec = 0
    usec = 10000
    timeval = struct.pack('ll', sec, usec)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)

    count = 0
    maxCount = timeout_us / (sec * 1000000 + usec)
    response = False
    while not response and count < maxCount:
        sock.sendto(request, (address, port))
        try:
            data, addr = sock.recvfrom(65536)
        except socket.error as serr:
            if serr.errno != errno.EAGAIN:  # EAGAIN == Resource Temporarily Unavailable
                raise serr
        else:
            response = len(data) > 0
        count += 1

    sock.close()
    return response
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号