def __init__(self, frame: Frame, _type: str, expr: str, timeout: float,
*args: Any, interval: float = 0) -> None:
"""Make new wait task.
:arg float timeout: msec to wait for task [default 30_000 [msec]].
:arg float interval: msec to poll for task [default timeout / 1000].
"""
if _type not in ['function', 'selector']:
raise ValueError('Unsupported type for WaitTask: ' + _type)
super().__init__()
self.__frame: Frame = frame
self.__type = _type
self.expr = expr
self.__timeout = timeout / 1000 # sec
self.__interval = interval / 1000 or self.__timeout / 100 # sec
self.__runCount: int = 0
self.__terminated = False
self.__done = False
frame._waitTasks.add(self)
# Since page navigation requires us to re-install the pageScript,
# we should track timeout on our end.
self.__loop = asyncio.get_event_loop()
self.__timeoutTimer = self.__loop.call_later(
self.__timeout,
lambda: self.terminate(
BrowserError(f'waiting failed: timeout {timeout}ms exceeded')
)
)
asyncio.ensure_future(self.rerun(True))
评论列表
文章目录