def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
options: dict = None, **kwargs: Any) -> Awaitable:
"""Wait until `selectorOrFunctionOrTimeout`."""
if options is None:
options = dict()
options.update(kwargs)
if isinstance(selectorOrFunctionOrTimeout, (int, float)):
fut: Awaitable[None] = asyncio.ensure_future(
asyncio.sleep(selectorOrFunctionOrTimeout))
return fut
if not isinstance(selectorOrFunctionOrTimeout, str):
fut = asyncio.get_event_loop().create_future()
fut.set_exception(TypeError(
'Unsupported target type: ' +
str(type(selectorOrFunctionOrTimeout))
))
return fut
if ('=>' in selectorOrFunctionOrTimeout or
selectorOrFunctionOrTimeout.strip().startswith('function')):
return self.waitForFunction(selectorOrFunctionOrTimeout, options)
return self.waitForSelector(selectorOrFunctionOrTimeout, options)
python类Union()的实例源码
def __init__(self,
host: str = '127.0.0.1',
port: int = 11300,
encoding: Optional[str] = 'utf-8',
use: str = DEFAULT_TUBE,
watch: Union[str, Iterable[str]] = DEFAULT_TUBE) -> None:
self._sock = socket.create_connection((host, port))
self._reader = self._sock.makefile('rb') # type: BinaryIO
self.encoding = encoding
if use != DEFAULT_TUBE:
self.use(use)
if isinstance(watch, str):
if watch != DEFAULT_TUBE:
self.watch(watch)
self.ignore(DEFAULT_TUBE)
else:
for tube in watch:
self.watch(tube)
if DEFAULT_TUBE not in watch:
self.ignore(DEFAULT_TUBE)
def retspec(defs):
# return specs
# only return 1, so if there is more than one type
# we need to include a union
# In truth there is only 1 return
# Error or the expected Type
if not defs:
return None
if defs in basic_types:
return strcast(defs, False)
rtypes = _registry.getObj(_types[defs])
if not rtypes:
return None
if len(rtypes) > 1:
return Union[tuple([strcast(r[1], True) for r in rtypes])]
return strcast(rtypes[0][1], False)
def type_anno_func(func, defs, is_result=False):
annos = {}
if not defs:
return func
rtypes = _registry.getObj(_types[defs])
if is_result:
kn = "return"
if not rtypes:
annos[kn] = None
elif len(rtypes) > 1:
annos[kn] = Union[tuple([r[1] for r in rtypes])]
else:
annos[kn] = rtypes[0][1]
else:
for name, rtype in rtypes:
name = name_to_py(name)
annos[name] = rtype
func.__annotations__.update(annos)
return func
def calc_mean_lp_scores(log_prob_scores: List[float],
lengths: List[int]) -> List[Union[None, float]]:
r"""
.. math:
\frac{%
\log P_\text{model}\left(\xi\right)
}{%
\text{length}\left(\xi\right)
}
>>> '{:.3f}'.format(calc_mean_lp_scores([-14.7579], [4])[0])
'-3.689'
"""
mean_lp_scores = []
for score, length in zip(log_prob_scores, lengths):
x = None \
if score is None or length == 0 \
else float(score) / float(length)
mean_lp_scores.append(x)
return mean_lp_scores
def calc_norm_lp_div_scores(
log_prob_scores: List[float],
unigram_scores: List[float]) -> List[Union[None, float]]:
r"""
.. math:
\frac{%
\log P_\text{model}\left(\xi\right)
}{%
\log P_\text{unigram}\left(\xi\right)
}
>>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0])
'-0.414'
"""
results = []
for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
x = None
else:
x = (-1.0) * float(log_prob) / float(unigram_score)
results.append(x)
return results
def calc_norm_lp_sub_scores(
log_prob_scores: List[float],
unigram_scores: List[float]) -> List[Union[None, float]]:
r"""
.. math:
\log P_\text{model}\left(\xi\right)
- \log P_\text{unigram}\left(\xi\right)
>>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0])
'20.875'
"""
results = []
for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
x = None
else:
x = float(log_prob) - float(unigram_score)
results.append(x)
return results
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
options: dict = None, **kwargs: Any) -> None:
"""Make new navigator watcher."""
if options is None:
options = {}
options.update(kwargs)
self._client = client
self._ignoreHTTPSErrors = ignoreHTTPSErrors
self._timeout = options.get('timeout', 3000)
self._idleTime = options.get('networkIdleTimeout', 1000)
self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
self._idleInflight = options.get('networkIdleInflight', 2)
self._waitUntil = options.get('waitUntil', 'load')
if self._waitUntil not in ('load', 'networkidle'):
raise ValueError(
f'Unknown value for options.waitUntil: {self._waitUntil}')
def convertPrintParameterToInches(parameter: Union[None, int, float, str]
) -> Optional[float]:
"""Convert print parameter to inches."""
if parameter is None:
return None
if isinstance(parameter, (int, float)):
pixels = parameter
elif isinstance(parameter, str):
text = parameter
unit = text[-2:].lower()
if unit in unitToPixels:
valueText = text[:-2]
else:
unit = 'px'
valueText = text
try:
value = float(valueText)
except ValueError:
raise ValueError('Failed to parse parameter value: ' + text)
pixels = value * unitToPixels[unit]
else:
raise TypeError('page.pdf() Cannot handle parameter type: ' +
str(type(parameter)))
return pixels / 96
def get_parents_of(self, name: Union[str, Hyperparameter]) -> List[Hyperparameter]:
"""Return the parent hyperparameters of a given hyperparameter.
Parameters
----------
name : str or Hyperparameter
Can either be the name of a hyperparameter or the hyperparameter
object.
Returns
-------
list
List with all parent hyperparameters.
"""
conditions = self.get_parent_conditions_of(name)
parents = [] # type: List[Hyperparameter]
for condition in conditions:
parents.extend(condition.get_parents())
return parents
def content(self) -> Union[bytes, bytearray, None]:
'''
Retrieves the content in the original form.
Private codes should NOT use this as it incurs duplicate
encoding/decoding.
'''
if self._content is None:
raise ValueError('content is not set.')
if self.content_type == 'application/octet-stream':
return self._content
elif self.content_type == 'application/json':
return json.loads(self._content.decode('utf-8'),
object_pairs_hook=OrderedDict)
elif self.content_type == 'text/plain':
return self._content.decode('utf-8')
elif self.content_type == 'multipart/form-data':
return self._content
else:
raise RuntimeError('should not reach here') # pragma: no cover
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
xml_ending = ".xml"
microphone_endings = [
"_Yamaha",
"_Kinect-Beam",
"_Kinect-RAW",
"_Realtek",
"_Samson",
"_Microsoft-Kinect-Raw"
]
xml_files = [file for file in files if file.name.endswith(xml_ending) if
self.id_filter_regex.match(name_without_extension(file))]
return OrderedDict(
(name_without_extension(file) + microphone_ending,
self._extract_label_from_xml(file))
for file in xml_files
for microphone_ending in microphone_endings
if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator":
if self._current is None or self._current.child is not None:
raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".")
if self._current.required:
raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".")
if supplies_type:
expected_type = supplies_type
else:
expected_type = type(value)
default_node = _DefaultValueNode(self._current.key, value, supplies_type)
result = self.as_(expected_type)
result._current.child.child = default_node
return result
def _ecies_gen_ephemeral_key(
recp_pubkey: Union[bytes, elliptic_curve.ec_element]
) -> Tuple[bytes, Tuple[bytes, bytes]]:
"""
Generates and encrypts an ephemeral key for the `recp_pubkey`.
:param recp_pubkey: Recipient's pubkey
:return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric
key, and encrypted ephemeral privkey
"""
symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey)
eph_privkey = API.ecies_gen_priv()
enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey)
return (eph_privkey, (enc_symm_key, enc_eph_privkey))
def ecdsa_priv2pub(
privkey: bytes,
to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
"""
Returns the public component of an ECDSA private key.
:param privkey: Private key as an int or bytestring
:param to_bytes: Serialize to bytes or not?
:return: Byte encoded or Tuple[int] ECDSA pubkey
"""
pubkey = privtopub(privkey)
if to_bytes:
return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey)
return pubkey
def ecdsa_verify(
v: int,
r: int,
s: int,
msghash: bytes,
pubkey: Union[bytes, Tuple[int, int]]
) -> bool:
"""
Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.
:param v: V of sig
:param r: R of sig
:param s: S of sig
:param bytes msghash: The hashed message to verify
:param bytes pubkey: Pubkey to validate signature for
:rtype: Boolean
:return: Is the signature valid or not?
"""
if bytes == type(pubkey):
pubkey = ecdsa_bytes2pub(pubkey)
verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
# TODO: Should this equality test be done better?
return verify_sig == pubkey
def ecies_priv2pub(
privkey: Union[bytes, elliptic_curve.ec_element],
to_bytes: bool = True
) -> Union[bytes, elliptic_curve.ec_element]:
"""
Takes a private key (secret bytes or an elliptic_curve.ec_element) and
derives the Public key from it.
:param privkey: The Private key to derive the public key from
:param to_bytes: Return the byte serialization of the pubkey?
:return: The Public component of the Private key provided
"""
if type(privkey) == bytes:
privkey = priv_bytes2ec(privkey)
pubkey = PRE.priv2pub(privkey)
if to_bytes:
return elliptic_curve.serialize(pubkey)[1:]
return pubkey
def ecies_rekey(
privkey_a: Union[bytes, elliptic_curve.ec_element],
privkey_b: Union[bytes, elliptic_curve.ec_element],
to_bytes: bool = True
) -> Union[bytes, umbral.RekeyFrag]:
"""
Generates a re-encryption key from privkey_a to privkey_b.
:param privkey_a: Private key to re-encrypt from
:param privkey_b: Private key to re-encrypt to
:param to_bytes: Format result as bytes?
:return: Re-encryption key
"""
if type(privkey_a) == bytes:
privkey_a = priv_bytes2ec(privkey_a)
if type(privkey_b) == bytes:
privkey_b = priv_bytes2ec(privkey_b)
rk = PRE.rekey(privkey_a, privkey_b)
if to_bytes:
return elliptic_curve.serialize(rk.key)[1:]
return rk
def ecies_split_rekey(
privkey_a: Union[bytes, elliptic_curve.ec_element],
privkey_b: Union[bytes, elliptic_curve.ec_element],
min_shares: int,
total_shares: int
) -> List[umbral.RekeyFrag]:
"""
Performs a split-key re-encryption key generation where a minimum
number of shares `min_shares` are required to reproduce a rekey.
Will split a rekey into `total_shares`.
:param privkey_a: Privkey to re-encrypt from
:param privkey_b: Privkey to re-encrypt to
:param min_shares: Minimum shares needed to reproduce rekey
:param total_shares: Total shares to generate from split-rekey gen
:return: A list of RekeyFrags to distribute
"""
if type(privkey_a) == bytes:
privkey_a = priv_bytes2ec(privkey_a)
if type(privkey_b) == bytes:
privkey_b = priv_bytes2ec(privkey_b)
umbral_rekeys = PRE.split_rekey(privkey_a, privkey_b,
min_shares, total_shares)
return [KFrag(umbral_kfrag=u) for u in umbral_rekeys]
def ecies_ephemeral_split_rekey(
privkey_a: Union[bytes, elliptic_curve.ec_element],
pubkey_b: Union[bytes, elliptic_curve.ec_element],
min_shares: int,
total_shares: int
) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]:
"""
Performs a split-key re-encryption key generation where a minimum
number of shares `min_shares` are required to reproduce a rekey.
Will split a rekey inot `total_shares`.
This also generates an ephemeral keypair for the recipient as `pubkey_b`.
:param privkey_a: Privkey to re-encrypt from
:param pubkey_b: Public key to re-encrypt for (w/ ephemeral key)
:param min_shares: Minium shares needed to reproduce a rekey
:param total_shares: Total shares to generate from split-rekey gen
:return: A tuple containing a list of rekey frags, and a tuple of the
encrypted ephemeral key data (enc_symm_key, enc_eph_privkey)
"""
eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b)
kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares)
pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message)
return (kfrags, pfrag)
def ecies_reencrypt(
rekey: Union[bytes, umbral.RekeyFrag],
enc_key: Union[bytes, umbral.EncryptedKey],
) -> umbral.EncryptedKey:
"""
Re-encrypts the key provided.
:param rekey: Re-encryption key to use
:param enc_key: Encrypted key to re-encrypt
:return: The re-encrypted key
"""
if type(rekey) == bytes:
rekey = umbral.RekeyFrag(None, priv_bytes2ec(rekey))
if type(enc_key) == bytes:
enc_key = umbral.EncryptedKey(priv_bytes2ec(enc_key), None)
reencrypted_data = PRE.reencrypt(rekey, enc_key)
return CFrag(reencrypted_data=reencrypted_data)
def add_key(self,
keypair: Union[keypairs.EncryptingKeypair,
keypairs.SigningKeypair],
store_pub: bool = True) -> bytes:
"""
Gets a fingerprint of the key and adds it to the keystore.
:param key: Key, in bytes, to add to lmdb
:return: Fingerprint, in bytes, of the added key
"""
if store_pub:
fingerprint = self._get_fingerprint(keypair.pubkey)
key = keypair.serialize_pubkey()
else:
fingerprint = self._get_fingerprint(keypair.privkey)
key = keypair.serialize_privkey()
# Create new Key object and commit to db
self.session.add(Key(key))
self.session.commit()
return fingerprint
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData:
form_fata = aiohttp.FormData()
for name, value in kwargs.items():
if isinstance(value, BotoFairu):
form_fata.add_field(
name, value._content,
content_type=value._content_type,
filename=value._filename,
content_transfer_encoding=value._content_transfer_encoding)
elif isinstance(value, (list, dict)):
form_fata.add_field(name, json.dumps(value))
elif isinstance(value, (int, float, str, bool)):
form_fata.add_field(name, str(value))
else:
raise TypeError("Unknown Type: {}".format(type(value)))
return form_fata
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float,
default: Any = _sentinel) -> Union[Dimensions, Any]:
"""
Returns
-------
The largest dimensions after cropping each dimensions to reach desired aspect ratio
"""
if not dimensions_list:
if default is not _sentinel:
return default
raise ValueError(f"{dimensions_list} must not be empty.")
largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio)
for dimensions in dimensions_list[1:]:
nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio)
if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution:
largest_dimensions = nearest_dimensions_to_aspect_ratio
return largest_dimensions
def __init__(self, sources=Opt[Union[List[Union[Source, 'VideoSourceList']], str]], **kwargs):
"""
Parameters
----------
sources
A list of sources.
Accepts arbitrarily nested video files, file globs, directories, VideoSources, and VideoSourceLists.
"""
self.name = None
if isinstance(sources, str):
self.name = paths.filename_from_path(sources)
# Build list of sources from directory or file glob
if os.path.isdir(sources):
sources = self._sources_from_files(util.files_from_directory(sources))
else:
sources = self._sources_from_files(globber.glob(sources))
else:
# Convert any source files to VideoSources, and any lists, file globs or directories to VideoSourceLists
sources = self._fill_in_sources(sources)
super().__init__(sources, **kwargs)
def speed_multiply(self, speed: Union[float, Fraction],
offset: Opt[int] = None):
"""
Speeds up or slows down events by grouping them together or splitting them up.
For slowdowns, event type group boundaries and isolated events are preserved.
Parameters
----------
speed
Factor to speedup or slowdown by.
Must be of the form x (speedup) or 1/x (slowdown), where x is a natural number.
Otherwise, 0 to remove all events.
offset
Offsets the grouping of events for slowdowns.
Takes a max offset of x - 1 for a slowdown of 1/x, where x is a natural number
"""
if speed == 0:
self.clear()
elif speed > 1:
self._split(speed.numerator)
elif speed < 1:
self._merge(speed.denominator, offset)
def __init__(self, groups: Opt[Union[List[EventList], List[List[TIME_FORMAT]]]] = None, *,
selected: List[EventList] = None):
"""
Parameters
----------
groups
selected
A subset of groups to track
"""
if groups is not None:
for index, group in enumerate(groups):
if not isinstance(group, EventList):
# Convert event locations to EventList
groups[index] = EventList(group)
super().__init__(groups)
self._selected_groups = selected if selected is not None else []
def send_email(loop: asyncio.AbstractEventLoop, mail_from: str, mail_to: Union[Iterable, str],
subject: str, body: str, server: str='localhost') -> None:
"""Send an email to one or more recipients.
Only supports plain text emails with a single message body.
No attachments etc.
"""
if type(mail_to) == str:
mail_to = [mail_to]
smtp = aiosmtplib.SMTP(hostname=server, port=25, loop=loop)
try:
await smtp.connect()
for rcpt in mail_to:
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = mail_from
msg['To'] = rcpt
await smtp.send_message(msg)
await smtp.quit()
except aiosmtplib.errors.SMTPException as e:
log.msg('Error sending smtp notification: %s' % (str(e)), 'NOTIFICATIONS')
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any:
"""Make sure a value is a boolean.
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
if type(value) != bool:
if not convert:
raise InvalidData()
if value in [None, 0, '0', 'false', 'False']:
value = False
elif value in [1, '1', 'true', 'True']:
value = True
else:
raise InvalidData('value was %s(%s), expected bool' % (type(value), value))
return cast(bool, value)
def __init__(self, id: int, args: Dict[str, str], monitor_def: ActiveMonitorDef, state: str, state_ts: float,
msg: str, alert_id: Union[int, None], checks_enabled: bool,
alerts_enabled: bool, manager: ActiveMonitorManager) -> None:
self.id = id
self.args = args
self.monitor_def = monitor_def
self.state = state
self.manager = manager
self.last_check_state = None # type: Optional[str]
self.consecutive_checks = 0
self.last_check = time.time()
self.msg = msg
self.alert_id = alert_id
self.state_ts = state_ts
if not self.state_ts:
self.state_ts = time.time()
self.monitoring = False
self.deleted = False
self.checks_enabled = checks_enabled
self.alerts_enabled = alerts_enabled
self._pending_reset = False
self.scheduled_job = None # type: Optional[asyncio.Handle]
self.scheduled_job_ts = 0.0
event.running('CREATE_ACTIVE_MONITOR', monitor=self)
stats.inc('num_monitors', 'ACT_MON')