def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
options: dict = None, **kwargs: Any) -> Awaitable:
"""Wait until `selectorOrFunctionOrTimeout`."""
if options is None:
options = dict()
options.update(kwargs)
if isinstance(selectorOrFunctionOrTimeout, (int, float)):
fut: Awaitable[None] = asyncio.ensure_future(
asyncio.sleep(selectorOrFunctionOrTimeout))
return fut
if not isinstance(selectorOrFunctionOrTimeout, str):
fut = asyncio.get_event_loop().create_future()
fut.set_exception(TypeError(
'Unsupported target type: ' +
str(type(selectorOrFunctionOrTimeout))
))
return fut
if ('=>' in selectorOrFunctionOrTimeout or
selectorOrFunctionOrTimeout.strip().startswith('function')):
return self.waitForFunction(selectorOrFunctionOrTimeout, options)
return self.waitForSelector(selectorOrFunctionOrTimeout, options)
python类Awaitable()的实例源码
def __getitem__(
self, name: Union[str, Tuple[str, bool]]) -> Callable[
..., Awaitable[str]]:
if isinstance(name, tuple):
block_name, defined_here = name
else:
block_name = name
defined_here = False
if block_name not in self._blocks.keys():
raise KeyError(f"Unknown Block Name {block_name}.")
SelectedBlockRuntime = self._blocks[block_name]
async def wrapper() -> str:
block_rt = SelectedBlockRuntime(
self._skt_rt, _defined_here=defined_here)
await block_rt._draw()
return block_rt._block_result
return wrapper
def _validate_response(cls, response: aiohttp.client.ClientResponse) -> Awaitable[aiohttp.client.ClientResponse]:
"""
Takes in a HTTP response, looks through it to see if its legit, if not raise some errors.
If all is good return the response
:param response: aiohttp response
:return: aiohttp response
"""
if 400 <= response.status < 600:
if response.headers.get('Content-Type') == 'application/json':
json_data = await response.json()
cls._raise_error(response.status, errors=json_data.get('errors'))
else:
text = await response.text()
cls._raise_error(response.status, message=text)
else:
return response
def __init__(self, json_dict: dict, request_func: Callable[[str, Union[str, List[str]], Optional[dict], Optional[int]],
Awaitable[aiohttp.client.ClientResponse]]) -> None:
self._request = request_func
self.warnings = None
self.auth = None
self.renewable = None
self.lease_duration = None
self.data = None
self.wrap_info = None
self.lease_id = None
self.request_id = None
self.wrapped_at = None
self.expires_at = None
self._set(json_dict)
def wait(self,
timeout: Union[float, int],
func: Callable[[], Awaitable[Any]],
*exceptions: Exception) -> Any:
deadline = time.time() + timeout
err = None
while deadline > time.time():
try:
result = await func()
if result:
return result
else:
await asyncio.sleep(0.2)
except exceptions as exc:
err = exc
await asyncio.sleep(0.2)
raise ArsenicTimeout() from err
def schedule_handler(cls: Any, obj: Any, context: Dict, func: Any, interval: Optional[Union[str, int]]=None, timestamp: Optional[str]=None, timezone: Optional[str]=None) -> Any:
async def handler() -> None:
values = inspect.getfullargspec(func)
kwargs = {k: values.defaults[i] for i, k in enumerate(values.args[len(values.args) - len(values.defaults):])} if values.defaults else {}
routine = func(*(obj,), **kwargs)
try:
if isinstance(routine, Awaitable):
await routine
except Exception as e:
pass
context['_schedule_scheduled_functions'] = context.get('_schedule_scheduled_functions', [])
context['_schedule_scheduled_functions'].append((interval, timestamp, timezone, func, handler))
start_func = cls.start_scheduler(cls, obj, context)
return (await start_func) if start_func else None
def __init__(self,
name: str,
request: Type[Req],
response: Type[Res],
service: Type[S],
implementation: Callable[[S, Req], Awaitable[Res]],
*,
http_path: str = None,
http_method: HTTPVerb = None,
http_status: int = None,
**options: Dict[str, Any]) -> None:
super().__init__(name,
request,
response,
service,
http_path=http_path,
http_method=http_method,
http_status=http_status,
**options)
self.implementation = implementation
def stop(self) -> Awaitable:
"""Stop."""
contentPromise = asyncio.get_event_loop().create_future()
self._client.once(
'Tracing.tracingComplete',
lambda event: asyncio.ensure_future(
self._readStream(event.get('stream'), self._path)
).add_done_callback(
lambda fut: contentPromise.set_result(
fut.result()) # type: ignore
)
)
await self._client.send('Tracing.end')
self._recording = False
return await contentPromise
def waitForSelector(self, selector: str, options: dict = None,
**kwargs: Any) -> Awaitable:
"""Wait for selector matches element."""
if options is None:
options = dict()
options.update(kwargs)
timeout = options.get('timeout', 30_000) # msec
interval = options.get('interval', 0) # msec
return WaitTask(self, 'selector', selector, timeout, interval=interval)
def waitForFunction(self, pageFunction: str, options: dict = None,
*args: str, **kwargs: Any) -> Awaitable:
"""Wait for js function return true."""
if options is None:
options = dict()
options.update(kwargs)
timeout = options.get('timeout', 30_000) # msec
interval = options.get('interval', 0) # msec
return WaitTask(self, 'function', pageFunction, timeout, *args,
interval=interval)
def send(self, method: str, params: dict) -> Awaitable:
"""Send message via the connection."""
self._lastId += 1
_id = self._lastId
msg = json.dumps(dict(
id=_id,
method=method,
params=params,
))
logger.debug(f'SEND?: {msg}')
asyncio.ensure_future(self._async_send(msg))
callback = asyncio.get_event_loop().create_future()
self._callbacks[_id] = callback
callback.method = method # type: ignore
return callback
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
options: dict = None, **kwargs: Any) -> Awaitable:
"""Wait for function, timeout, or element which matches on page."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return frame.waitFor(selectorOrFunctionOrTimeout, options, **kwargs)
def waitForSelector(self, selector: str, options: dict = None,
**kwargs: Any) -> Awaitable:
"""Wait until element which matches selector appears on page."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return frame.waitForSelector(selector, options, **kwargs)
def waitForFunction(self, pageFunction: str, options: dict = None,
*args: str, **kwargs: Any) -> Awaitable:
"""Wait for function."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return frame.waitForFunction(pageFunction, options, *args, **kwargs)
def _wrap_coro_func(coro: Callable[[Event], Awaitable]
) -> Callable[[Event], Awaitable]:
def wrapper(e: Event) -> Future:
return ensure_future(coro(e))
return wrapper
def __call__(self, event: Event) -> Awaitable[None]:
"""Execute wrapped event listener.
Pass event object to the listener as a first argument.
"""
return self.action(event)
def js_query(self, query: str) -> Awaitable:
"""Send query to related DOM on browser.
:param str query: single string which indicates query type.
"""
if self.connected:
self.js_exec(query, self.__reqid)
fut = Future() # type: Future[str]
self.__tasks[self.__reqid] = fut
self.__reqid += 1
return fut
f = Future() # type: Future[None]
f.set_result(None)
return f
def scrollX(self) -> Awaitable: # noqa: D102
return self.js_query('scrollX')
def scrollY(self) -> Awaitable: # noqa: D102
return self.js_query('scrollY')
def __getattr__(self, name: str) -> Callable[..., Awaitable[str]]:
try:
return self[name]
except KeyError as e:
raise AttributeError from e
def process(self, job: Job) -> Awaitable:
self.log.debug(f'creating subprocess for {job}')
return asyncio.create_subprocess_exec(
job.exe,
*job.args,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=str(job.cwd),
loop=self.loop,
**job.kw,
)
def run_coro_when_free(self, coro: Awaitable) -> None:
while self._loop.is_running():
time.sleep(.01)
self._loop.run_until_complete(coro)
def tpe(self) -> Type[Awaitable]:
return Awaitable
def extract(self, data: Awaitable, tail: List[TransEffect], in_state: bool) -> Either[R, N]:
async def coro_map(run: Callable[[R], TransStep]) -> TransStep:
res = await data
return lift(run(res), in_state)
coro = cont(tail, False, coro_map) | data
return Lift(Propagate.one(CoroutineAlg(coro).pub))
def _get(self, path: Union[str, List[str]], params: Optional[dict] = None,
wrap_ttl: Optional[int] = None) -> Awaitable[aiohttp.client.ClientResponse]:
"""
HTTP GET request
:param path: Path components
:param wrap_ttl: Optional TTL
:return: A response object from aiohttp
"""
return await self._request('get', path, payload=None, params=params, wrap_ttl=wrap_ttl)
def _delete(self, path: Union[str, List[str]], params: Optional[dict] = None,
wrap_ttl: Optional[int] = None) -> Awaitable[aiohttp.client.ClientResponse]:
"""
HTTP DELETE request
:param path: Path components
:param wrap_ttl: Optional TTL
:return: A response object from aiohttp
"""
return await self._request('delete', path, payload=None, params=params, wrap_ttl=wrap_ttl)
def _list(self, path: Union[str, List[str]], params: Optional[dict] = None,
wrap_ttl: Optional[int] = None) -> Awaitable[aiohttp.client.ClientResponse]:
"""
HTTP LIST request
:param path: Path components
:param wrap_ttl: Optional TTL
:return: A response object from aiohttp
"""
return await self._request('list', path, payload=None, params=params, wrap_ttl=wrap_ttl)
def _post(self, path: Union[str, List[str]], payload: Optional[dict] = None, params: Optional[dict] = None,
wrap_ttl: Optional[int] = None) -> Awaitable[aiohttp.client.ClientResponse]:
"""
HTTP POST request
:param path: Path components
:param payload: Dictonary of key value to be turned into JSON
:param wrap_ttl: Optional TTL
:return: A response object from aiohttp
"""
return await self._request('post', path, payload=payload, params=params, wrap_ttl=wrap_ttl)
def _put(self, path: Union[str, List[str]], payload: Optional[dict] = None, params: Optional[dict] = None,
wrap_ttl: Optional[int] = None) -> Awaitable[aiohttp.client.ClientResponse]:
"""
HTTP PUT request
:param path: Path components
:param payload: Dictonary of key value to be turned into JSON
:param wrap_ttl: Optional TTL
:return: A response object from aiohttp
"""
return await self._request('put', path, payload=payload, params=params, wrap_ttl=wrap_ttl)
def __init__(self, coro: typing.Awaitable[typing.List[typing.Any]]):
self.coro = coro
self.items = collections.deque()
self._filled = False