def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
options: dict = None, **kwargs: Any) -> Awaitable:
"""Wait until `selectorOrFunctionOrTimeout`."""
if options is None:
options = dict()
options.update(kwargs)
if isinstance(selectorOrFunctionOrTimeout, (int, float)):
fut: Awaitable[None] = asyncio.ensure_future(
asyncio.sleep(selectorOrFunctionOrTimeout))
return fut
if not isinstance(selectorOrFunctionOrTimeout, str):
fut = asyncio.get_event_loop().create_future()
fut.set_exception(TypeError(
'Unsupported target type: ' +
str(type(selectorOrFunctionOrTimeout))
))
return fut
if ('=>' in selectorOrFunctionOrTimeout or
selectorOrFunctionOrTimeout.strip().startswith('function')):
return self.waitForFunction(selectorOrFunctionOrTimeout, options)
return self.waitForSelector(selectorOrFunctionOrTimeout, options)
python类Any()的实例源码
def _send_receive(self, nummsgs: int, outformat: str='json',
dataupdate: Optional[Dict[AnyStr, Any]]=None,
restart_data: bool=True) -> List[Response]:
if restart_data:
self._restart_data(outformat)
if dataupdate:
self.data.update(dataupdate)
self._add_to_buffer(nummsgs, outformat)
self.sendbuffer.seek(0)
processor, _ = get_processor_instance(
outformat,
custom_outbuffer=self.recvbuffer,
custom_inbuffer=self.sendbuffer
)
processor.process_requests(self.sendbuffer)
return self._loadResults(outformat)
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
"""
Get a processor instance. The class and buffers will be selected based on the
python_driver.ProcessorConfigs dictionary. The input and output buffers can
be overriden using the custom_inbuffer and custom_outbuffer parameters. This
is mainly useful for unittesting.
"""
conf = ProcessorConfigs.get(format_)
if not conf:
raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)
inbuffer = custom_inbuffer if custom_inbuffer else conf['inbuffer']
outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
instance = conf['class'](outbuffer) # type: ignore
return instance, inbuffer
def start(self, options: dict = None, **kwargs: Any) -> None:
"""Start."""
options = options or dict()
options.update(kwargs)
categoriesArray = [
'-*', 'devtools.timeline', 'v8.execute',
'disabled-by-default-devtools.timeline',
'disabled-by-default-devtools.timeline.frame', 'toplevel',
'blink.console', 'blink.user_timing', 'latencyInfo',
'disabled-by-default-devtools.timeline.stack',
'disabled-by-default-v8.cpu_profiler',
]
if 'screenshots' in options:
categoriesArray.append('disabled-by-default-devtools.screenshot')
self._path = options.get('path', '')
self._recording = True
await self._client.send('Tracing.start', {
'transferMode': 'ReturnAsStream',
'categories': ','.join(categoriesArray),
})
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
options: dict = None, **kwargs: Any) -> None:
"""Make new navigator watcher."""
if options is None:
options = {}
options.update(kwargs)
self._client = client
self._ignoreHTTPSErrors = ignoreHTTPSErrors
self._timeout = options.get('timeout', 3000)
self._idleTime = options.get('networkIdleTimeout', 1000)
self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
self._idleInflight = options.get('networkIdleInflight', 2)
self._waitUntil = options.get('waitUntil', 'load')
if self._waitUntil not in ('load', 'networkidle'):
raise ValueError(
f'Unknown value for options.waitUntil: {self._waitUntil}')
def waitForNavigation(self, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Wait navigation completes."""
if options is None:
options = dict()
options.update(kwargs)
watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
options)
responses: Dict[str, Response] = dict()
listener = helper.addEventListener(
self._networkManager,
NetworkManager.Events.Response,
lambda response: responses.__setitem__(response.url, response)
)
await watcher.waitForNavigation()
helper.removeEventListeners([listener])
return responses.get(self.url)
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes:
"""Take screen shot."""
options = options or dict()
options.update(kwargs)
screenshotType = None
if 'path' in options:
mimeType, _ = mimetypes.guess_type(options['path'])
if mimeType == 'image/png':
screenshotType = 'png'
elif mimeType == 'image/jpeg':
screenshotType = 'jpeg'
else:
raise PageError('Unsupported screenshot '
f'mime type: {mimeType}')
if 'type' in options:
screenshotType = options['type']
if not screenshotType:
screenshotType = 'png'
return await self._screenshotTask(screenshotType, options)
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None:
"""Make new launcher."""
self.options = options or dict()
self.options.update(kwargs)
self.chrome_args = DEFAULT_ARGS
self._tmp_user_data_dir: Optional[str] = None
self._parse_args()
if 'headless' not in self.options or self.options.get('headless'):
self.chrome_args = self.chrome_args + [
'--headless',
'--disable-gpu',
'--hide-scrollbars',
'--mute-audio',
]
if 'executablePath' in self.options:
self.exec = self.options['executablePath']
else:
if not check_chromium():
download_chromium()
self.exec = str(chromium_excutable())
self.cmd = [self.exec] + self.chrome_args
def move(self, x: int, y: int, options: dict = None, **kwargs: Any
) -> None:
"""Move cursor."""
options = options or dict()
options.update(kwargs)
fromX = self._x
fromY = self._y
self._x = x
self._y = y
steps = options.get('steps', 1)
for i in range(1, steps + 1):
x = round(fromX + (self._x - fromX) * (i / steps))
y = round(fromY + (self._y - fromY) * (i / steps))
await self._client.send('Input.dispatchMouseEvent', {
'type': 'mouseMoved',
'button': self._button,
'x': x,
'y': y,
'modifiers': self._keyboard._modifiers,
})
def contains_all_options(optiongroup, parentstyle, matchvalues=False):
# type: (Dict[Any, Any], Style, bool) -> bool
"""Returns true if all options in optiongroup are present in parentstyle.
If matchvalues is True, the values in optiongroup must also match
those in the parentstyle.
"""
for optionname, value in optiongroup.items():
if optionname not in parentstyle:
return False
if isinstance(value, dict):
parent_suboptions = parentstyle[optionname]
if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues):
return False
elif matchvalues:
pvalue = parentstyle.get(optionname)
if type(pvalue) != type(value):
return False
if pvalue != value:
return False
return True
def compute_unified_diff(filename, content2, **kwargs):
# type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
diff = () # type: Iterable[str]
exit_code = ERROR
kw = kwargs.copy()
if 'n' not in kwargs:
# zero context lines
kw['n'] = 0
try:
content1 = get_cached_file(filename)
if PY3:
c1 = unistr(content1)
c2 = unistr(content2)
else:
c1 = content1
c2 = content2
diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
exit_code = OK
finally:
return exit_code, diff
# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
def set_value(self, key, value):
# type: (str, Any) -> None
"""Modify a value in the configuration.
"""
self._ensure_have_load_only()
fname, parser = self._get_parser_to_modify()
if parser is not None:
section, name = _disassemble_key(key)
# Modify the parser and the configuration
if not parser.has_section(section):
parser.add_section(section)
parser.set(section, name, value)
self._config[self.load_only][key] = value
self._mark_as_modified(fname, parser)
def received(self, type: str, resp: Dict[str, Any]) -> None:
if type == 'message':
self.message_received(resp['text'])
elif type == 'init':
raise Exception()
elif type == 'advanced':
# Fast UI Animations
pass
elif type == 'connected':
# Players connected to the game
pass
elif type == 'notify':
self.handle_notify(resp)
elif type == 'action':
self.decide_action(resp['can_clue'], resp['can_discard'])
else:
print(type, resp)
raise Exception()
def __getitem__(self, item: str) -> Any:
if self._query_values or item in self._values:
return self._values.get(item)
hyperparameter = self.configuration_space._hyperparameters[item]
item_idx = self.configuration_space._hyperparameter_idx[item]
if not np.isfinite(self._vector[item_idx]):
raise KeyError()
value = hyperparameter._transform(self._vector[item_idx])
# Truncate the representation of the float to be of constant
# length for a python version
if isinstance(hyperparameter, FloatHyperparameter):
value = float(repr(value))
# TODO make everything faster, then it'll be possible to init all values
# at the same time and use an OrderedDict instead of only a dict here to
# support iterating that dict in the same order as the actual order of
# hyperparameters
self._values[item] = value
return self._values[item]
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
[List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
examples_by_directory = group(examples, key=key_from_example)
directories = examples_by_directory.keys()
# split must be the same every time:
random.seed(42)
keys = set(random.sample(directories, int(training_share * len(directories))))
training_examples = [example for example in examples if key_from_example(example) in keys]
test_examples = [example for example in examples if key_from_example(example) not in keys]
return training_examples, test_examples
return split
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
"""Create a mapping Var -> L from each variable in Var to the corresponding element in L.
:param variables: list of program variables
:param lattices: dictionary from variable types to the corresponding lattice types
:param arguments: dictionary from variable types to arguments of the corresponding lattices
"""
super().__init__()
self._variables = variables
self._lattices = lattices
self._arguments = arguments
try:
self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
except KeyError as key:
error = f"Missing lattice for variable type {repr(key.args[0])}!"
raise ValueError(error)
def hashex(method, # type: HashMethod
key, # type: KeyType
**options # type: typing.Any
):
# type: (...) -> int
if isinstance(key, six.text_type):
key = key.encode('utf-8')
if method.name.lower() in hashlib.algorithms_guaranteed:
return int(hashlib.new(method.name.lower(), key).hexdigest(), 16)
elif method == HashMethod.MMH3_32:
return hash_murmur3(key, size=32, **options)
elif method == HashMethod.MMH3_64:
return hash_murmur3(key, size=64, **options)
elif method == HashMethod.MMH3_128:
return hash_murmur3(key, size=128, **options)
elif method == HashMethod.SIPHASH:
return hash_siphash(key, **options)
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
dispatcher = singledispatch(method)
provides = set()
def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
call = dispatcher.dispatch(type)
try:
return call(self, query, context=context)
except TypeError:
raise DataSource.unsupported(type)
def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
provides.add(type)
return dispatcher.register(type)
wrapper.register = register
wrapper._provides = provides
update_wrapper(wrapper, method)
return wrapper
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
dispatcher = singledispatch(method)
accepts = set()
def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
call = dispatcher.dispatch(type)
try:
return call(self, items, context=context)
except TypeError:
raise DataSink.unsupported(type)
def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
accepts.add(type)
return dispatcher.register(type)
wrapper.register = register
wrapper._accepts = accepts
update_wrapper(wrapper, method)
return wrapper
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
best = None
best_cost = _MAX_TRANSFORM_COST
to_type = None
for target_type in target_types:
try:
transform, cost = self._transform(source_type, target_type)
if cost < best_cost:
best = transform
best_cost = cost
to_type = target_type
except NoConversionError:
pass
if best is None:
raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
return best, to_type, best_cost
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
# This is a little weird. We have to handle the case of can_have("x").and_("y") here.
# Since the only type of Node that can return False rather than just raising a
# QueryValidationError is a KeyNode that isn't required, or an OrNode whose children
# are all KeyNodes that aren't required, we can't short circuit on a False value. If
# we have can_have("x").and_("y"), and only some are True, we want raise a
# QueryValidationError. If all are True or all are False, everything's fine.
all_true = True
all_possible_false = True
for child in self.children:
is_true = child.evaluate(query, context)
all_true = all_true and is_true
if child.falsifiable:
all_possible_false = all_possible_false and not is_true
if all_possible_false or all_true:
return True
raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
dispatcher = singledispatch(method)
transforms = {}
def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T:
call = dispatcher.dispatch(TypePair[value.__class__, target_type])
try:
return call(self, value, context=context)
except TypeError:
raise DataTransformer.unsupported(target_type, value)
def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
try:
target_types = transforms[from_type]
except KeyError:
target_types = set()
transforms[from_type] = target_types
target_types.add(to_type)
return dispatcher.register(TypePair[from_type, to_type])
wrapper.register = register
wrapper._transforms = transforms
update_wrapper(wrapper, method)
return wrapper
def execute(self, query: str, *, infer: bool = True, multi: bool = False) -> Any:
"""Execute a Graql query against the knowledge base
:param query: the Graql query string to execute against the knowledge base
:param infer: enable inference
:param multi: treat this request as a list of queries
:return: a list of query results
:raises: GraknError, requests.exceptions.ConnectionError
"""
response = self._post(query, infer=infer, multi=multi)
if response.ok:
return response.json()
else:
raise GraknError(response.json()['exception'])
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData:
form_fata = aiohttp.FormData()
for name, value in kwargs.items():
if isinstance(value, BotoFairu):
form_fata.add_field(
name, value._content,
content_type=value._content_type,
filename=value._filename,
content_transfer_encoding=value._content_transfer_encoding)
elif isinstance(value, (list, dict)):
form_fata.add_field(name, json.dumps(value))
elif isinstance(value, (int, float, str, bool)):
form_fata.add_field(name, str(value))
else:
raise TypeError("Unknown Type: {}".format(type(value)))
return form_fata
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float,
default: Any = _sentinel) -> Union[Dimensions, Any]:
"""
Returns
-------
The largest dimensions after cropping each dimensions to reach desired aspect ratio
"""
if not dimensions_list:
if default is not _sentinel:
return default
raise ValueError(f"{dimensions_list} must not be empty.")
largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio)
for dimensions in dimensions_list[1:]:
nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio)
if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution:
largest_dimensions = nearest_dimensions_to_aspect_ratio
return largest_dimensions
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None,
durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack':
subtitles = []
if locations:
start_times, end_times = loc_util.start_end_locations_from_locations(locations)
elif durations:
start_times, end_times = loc_util.start_end_locations_from_intervals(durations)
else:
raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}")
for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)):
subtitle = Subtitle(str(text), start_time, end_time)
subtitles.append(subtitle)
subtitle_track = cls(subtitles, name, default=default)
return subtitle_track
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
ret = {
'webhook-url': config.get('slack-webhook-url'),
'tmpl-msg': config.get('slack-tmpl-msg'),
'tmpl-duration': config.get('slack-tmpl-duration', fallback=''),
'tmpl-url': config.get('slack-tmpl-url', fallback='')
} # type: Any
if not ret['webhook-url'] or not ret['tmpl-msg']:
log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS')
ret = None
else:
log.debug('Valid slack notification settings found', 'NOTIFICATIONS')
ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg'])
if ret['tmpl-duration']:
ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration'])
if ret['tmpl-url']:
ret['tmpl-url'] = jinja2.Template(ret['tmpl-url'])
return ret
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
"""Parse clicksend sms settings.
Should only be called from sms.parse_settings.
"""
ret = {
'provider': 'clicksend',
'username': config.get('sms-clicksend-username'),
'api-key': config.get('sms-clicksend-api-key'),
'sender': config.get('sms-clicksend-sender'),
'tmpl': config.get('sms-tmpl'),
} # type: Any
if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']:
log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS')
ret = None
else:
log.debug('Valid SMS notification settings found', 'NOTIFICATIONS')
ret['tmpl'] = jinja2.Template(ret['tmpl'])
return ret
def ctrlc_signal_handler(sgn: int, frame: Any) -> None:
sys.exit(0)
def __call__(self, input: Any, *styles, reset: bool=True, apply: bool=True):
"""
Styles text with ANSI styles and returns the new string.
By default the styling is cleared at the end of the string, this can be prevented with``reset=False``.
Examples::
print(sformat('Hello World!', sformat.green))
print(sformat('ATTENTION!', sformat.bg_magenta))
print(sformat('Some things', sformat.reverse, sformat.bold))
:param input: the object to style with ansi codes.
:param *styles: zero or more styles to apply to the text, should be either style instances or strings
matching style names.
:param reset: if False the ansi reset code is not appended to the end of the string
:param: apply: if False no ansi codes are applied
"""
text = str(input)
if not apply:
return text
codes = []
for s in styles:
# raw ints are allowed
if not isinstance(s, self.__class__) and not isinstance(s, int):
try:
s = self.styles[s]
except KeyError:
raise ValueError('invalid style "{}"'.format(s))
codes.append(str(s.value))
if codes:
r = _ansi_template.format(';'.join(codes)) + text
else:
r = text
if reset:
r += _ansi_template.format(self.reset)
return r