def __call__(self, value: Any, *, indent: int=0, indent_first: bool=False, highlight: bool=False):
self._stream = io.StringIO()
self._format(value, indent_current=indent, indent_first=indent_first)
s = self._stream.getvalue()
if highlight and pyg_lexer:
# apparently highlight adds a trailing new line we don't want
s = pygments.highlight(s, lexer=pyg_lexer, formatter=pyg_formatter).rstrip('\n')
return s
python类Any()的实例源码
def _format(self, value: Any, indent_current: int, indent_first: bool):
if indent_first:
self._stream.write(indent_current * self._c)
value_repr = repr(value)
if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator):
self._stream.write(value_repr)
else:
indent_new = indent_current + self._indent_step
for t, func in self._type_lookup:
if isinstance(value, t):
func(value, value_repr, indent_current, indent_new)
return
self._format_raw(value, value_repr, indent_current, indent_new)
def _format_raw(self, value: Any, value_repr: str, indent_current: int, indent_new: int):
lines = value_repr.splitlines(True)
if len(lines) > 1 or (len(value_repr) + indent_current) >= self._width:
self._stream.write('(\n')
wrap_at = self._width - indent_new
prefix = indent_new * self._c
for line in lines:
sub_lines = textwrap.wrap(line, wrap_at)
for sline in sub_lines:
self._stream.write(prefix + sline + '\n')
self._stream.write(indent_current * self._c + ')')
else:
self._stream.write(value_repr)
def __init__(self) -> None:
"""Make new multimap."""
# maybe defaultdict(set) is better
self._map: OrderedDict[str, List[Any]] = OrderedDict()
def set(self, key: str, value: Any) -> None:
"""Set value."""
_set = self._map.get(key)
if not _set:
_set = list()
self._map[key] = _set
if value not in _set:
_set.append(value)
def get(self, key: str) -> List[Any]:
"""Get values."""
return self._map.get(key, list())
def hasValue(self, key: str, value: Any) -> bool:
"""Chekc value is in this map."""
_set = self._map.get(key, list())
return value in _set
def firstValue(self, key: str) -> Any:
"""Get first value of the key."""
_set = self._map.get(key)
if not _set:
return None
return _set[0]
def valuesArray(self) -> List[Any]:
"""Get all values as list."""
result: List[Any] = list()
for values in self._map.values():
result.extend(values)
return result
def querySelectorEval(self, selector: str, pageFunction: str,
*args: Any) -> Optional[Any]:
"""Execute function on element which matches selector."""
elementHandle = await self.querySelector(selector)
if elementHandle is None:
raise PageError(
f'Error: failed to find element matching selector "{selector}"'
)
result = await elementHandle.evaluate(pageFunction, *args)
await elementHandle.dispose()
return result
def waitForSelector(self, selector: str, options: dict = None,
**kwargs: Any) -> Awaitable:
"""Wait for selector matches element."""
if options is None:
options = dict()
options.update(kwargs)
timeout = options.get('timeout', 30_000) # msec
interval = options.get('interval', 0) # msec
return WaitTask(self, 'selector', selector, timeout, interval=interval)
def __init__(self, frame: Frame, _type: str, expr: str, timeout: float,
*args: Any, interval: float = 0) -> None:
"""Make new wait task.
:arg float timeout: msec to wait for task [default 30_000 [msec]].
:arg float interval: msec to poll for task [default timeout / 1000].
"""
if _type not in ['function', 'selector']:
raise ValueError('Unsupported type for WaitTask: ' + _type)
super().__init__()
self.__frame: Frame = frame
self.__type = _type
self.expr = expr
self.__timeout = timeout / 1000 # sec
self.__interval = interval / 1000 or self.__timeout / 100 # sec
self.__runCount: int = 0
self.__terminated = False
self.__done = False
frame._waitTasks.add(self)
# Since page navigation requires us to re-install the pageScript,
# we should track timeout on our end.
self.__loop = asyncio.get_event_loop()
self.__timeoutTimer = self.__loop.call_later(
self.__timeout,
lambda: self.terminate(
BrowserError(f'waiting failed: timeout {timeout}ms exceeded')
)
)
asyncio.ensure_future(self.rerun(True))
def setUserAgent(self, userAgent: str) -> Any:
"""Set user agent."""
return await self._client.send('Network.setUserAgentOverride',
{'userAgent': userAgent})
def response(self) -> Any:
"""Get response."""
return self._response
def _onTargetCrashed(self, *args: Any, **kwargs: Any) -> None:
self.emit('error', PageError('Page crashed!'))
def querySelectorEval(self, selector: str, pageFunction: str,
*args: Any) -> Optional[Any]:
"""Execute function on element which matches selector."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return await frame.querySelectorEval(selector, pageFunction, *args)
def _onDialog(self, event: Any) -> None:
dialogType = ''
_type = event.get('type')
if _type == 'alert':
dialogType = Dialog.Type.Alert
elif (_type == 'confirm'):
dialogType = Dialog.Type.Confirm
elif (_type == 'prompt'):
dialogType = Dialog.Type.Prompt
elif (_type == 'beforeunload'):
dialogType = Dialog.Type.BeforeUnload
dialog = Dialog(self._client, dialogType, event.get('message'),
event.get('defaultPrompt'))
self.emit(Page.Events.Dialog, dialog)
def goto(self, url: str, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Got to url."""
if options is None:
options = dict()
options.update(kwargs)
watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
options)
responses: Dict[str, Response] = dict()
listener = helper.addEventListener(
self._networkManager, NetworkManager.Events.Response,
lambda response: responses.__setitem__(response.url, response)
)
result = asyncio.ensure_future(watcher.waitForNavigation())
referrer = self._networkManager.extraHTTPHeaders().get('referer', '')
try:
await self._client.send('Page.navigate',
dict(url=url, referrer=referrer))
except Exception:
watcher.cancel()
raise
await result
helper.removeEventListeners([listener])
if self._frameManager.isMainFrameLoadingFailed():
raise PageError('Failed to navigate: ' + url)
return responses.get(self.url)
def reload(self, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Reload this page."""
if options is None:
options = dict()
options.update(kwargs)
await self._client.send('Page.reload')
return await self.waitForNavigation(options)
def goForward(self, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Go forward history."""
if options is None:
options = dict()
options.update(kwargs)
return await self._go(+1, options)