def discardSomeCard(self) -> bool:
best_index: int = 0
card: CardKnowledge
bestCard: CardKnowledge
i: int
for i in range(len(self.hand)):
card = cast(CardKnowledge, self.game.deck[self.hand[i]])
bestCard = cast(CardKnowledge,
self.game.deck[self.hand[best_index]])
if bestCard.maybeValue is None:
best_index = i
elif (card.maybeValue is not None
and card.maybeValue > bestCard.maybeValue):
best_index = i
self.discard_card(best_index)
return True
python类cast()的实例源码
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any:
"""Make sure a value is a boolean.
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
if type(value) != bool:
if not convert:
raise InvalidData()
if value in [None, 0, '0', 'false', 'False']:
value = False
elif value in [1, '1', 'true', 'True']:
value = True
else:
raise InvalidData('value was %s(%s), expected bool' % (type(value), value))
return cast(bool, value)
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None,
allow_none: bool=False) -> Any:
"""Make sure a value is a Dict[key_type, value_type].
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
if type(value) != dict:
raise InvalidData('value was %s(%s), expected dict' % (type(value), value))
value = cast(Dict, value)
if key_type or value_type:
for k, v in value.items():
if key_type and type(k) != key_type:
raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type))
if value_type and type(v) != value_type:
raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type))
return value
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]:
include_metadata = require_bool(
get_request_param(self.request, 'include_metadata', error_if_missing=False),
convert=True) or False
if not include_metadata:
return None
if 'id' in self.request.rel_url.query:
metadata_models = await metadata.get_metadata_for_object(
dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id'))))
elif 'meta_key' in self.request.rel_url.query:
meta_key = require_str(get_request_param(self.request, 'meta_key'))
meta_value = require_str(get_request_param(self.request, 'meta_value'))
metadata_models = await metadata.get_metadata_for_object_metadata(
dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors')
elif 'monitor_group_id' in self.request.rel_url.query:
metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group(
dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id'))))
else:
metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor')
metadata_dict = {} # type: Dict[int, Dict[str, str]]
for metadata_model in metadata_models:
if metadata_model.object_id not in metadata_dict:
metadata_dict[metadata_model.object_id] = {}
metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value
return metadata_dict
def _sort_dataset_by_padding(dataset: Dataset,
sorting_keys: List[Tuple[str, str]], # pylint: disable=invalid-sequence-index
padding_noise: float = 0.0) -> Dataset:
"""
Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in
``sorting_keys`` (in the order in which they are provided). ``sorting_keys`` is a list of
``(field_name, padding_key)`` tuples.
"""
instances_with_lengths = []
for instance in dataset.instances:
padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths())
if padding_noise > 0.0:
noisy_lengths = {}
for field_name, field_lengths in padding_lengths.items():
noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise)
padding_lengths = noisy_lengths
instance_with_lengths = ([padding_lengths[field_name][padding_key]
for (field_name, padding_key) in sorting_keys],
instance)
instances_with_lengths.append(instance_with_lengths)
instances_with_lengths.sort(key=lambda x: x[0])
return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]:
"""
Finds this weeks remaining common breaks between a group of users
"""
# So, the Mypy type checker treats `List` as invariant, meaning we
# can't give a `List[B]` to a function that expects a `List[A]` if
# B is a subclass of A.
# So we have to cast it in to the function...
# FIXME: Get rid of these casts when Van Rossum figures out how to write a
# proper type system
breaks = cast(List[Event_], get_shared_breaks(group_members))
now = datetime.now(BRISBANE_TIME_ZONE)
### ... and out.
return cast(List[Break], get_this_weeks_events(now, breaks))
# FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO:
"""
:param filename: the name of the file in the generated config folder
:param mode: pass an ``int`` here if you want to modify the files mode (will be umasked)
:return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on*
"""
if os.path.isfile(filename):
raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path")
self.ensure_config_folder()
f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8"))
if mode:
os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode))
return f
def validate_args(self, args: configargparse.Namespace) -> None:
_aptly_args.validate_shared_args(args)
from gopythongo.versioners import get_version_parsers
debvp = cast(DebianVersionParser, get_version_parsers()["debian"]) # type: DebianVersionParser
if args.version_action not in debvp.supported_actions:
raise ErrorMessage("Version Action is set to '%s', but you chose the Aptly Store which relies on Debian "
"version strings. Unfortunately the Debian Versioner does not support the '%s' action. "
"It only supports: %s." %
(highlight(args.version_action), highlight(args.version_action),
highlight(", ".join(debvp.supported_actions))))
if "-distribution" in args.aptly_publish_opts:
print_warning("You are using %s in your Aptly Store options. You should use the %s GoPythonGo argument "
"instead, since using -distribution in the aptly command line is invalid when GoPythonGo "
"tries to update a published repo." %
(highlight("-distribution"), highlight("--aptly-distribution")))
if args.use_aptly_wrapper:
wrapper_cmd = create_script_path(the_context.gopythongo_path, "vaultwrapper")
if not os.path.exists(wrapper_cmd) or not os.access(wrapper_cmd, os.X_OK):
raise ErrorMessage("%s can either not be found or is not executable. The vault wrapper seems to "
"be unavailable." % wrapper_cmd)
self.aptly_wrapper_cmd = wrapper_cmd
def _get_commands(dist # type: setuptools.dist.Distribution
):
# type: (...) -> typing.Dict[str, typing.Set[str]]
"""Find all commands belonging to the given distribution.
Args:
dist: The Distribution to search for docopt-compatible docstrings that
can be used to generate command entry points.
Returns:
A dictionary containing a mapping of primary commands to sets of
subcommands.
"""
py_files = (f for f in setuptools.findall()
if os.path.splitext(f)[1].lower() == '.py')
pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages)
commands = {} # type: typing.Dict[str, typing.Set[str]]
for file_name in pkg_files:
with open(file_name) as py_file:
module = typing.cast(ast.Module, ast.parse(py_file.read()))
module_name = _get_module_name(file_name)
_append_commands(commands, module_name, _get_module_commands(module))
_append_commands(commands, module_name, _get_class_commands(module))
_append_commands(commands, module_name, _get_function_commands(module))
return commands
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]:
"""Get all objects of the specified model filtered by the specified
criteria.
.. note::
``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
SQLAlchemy model.
:param model: The object to get.
:param criteria: The criteria to filter with.
:returns: The requested objects.
:raises APIException: If no object with the given id could be found.
(OBJECT_ID_NOT_FOUND)
"""
return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y:
"""Get a single object of the specified model by filtering or raise an
exception.
.. note::
``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
SQLAlchemy model.
:param model: The object to get.
:param criteria: The criteria to filter with.
:returns: The requested object.
:raises APIException: If no object with the given id could be found.
(OBJECT_ID_NOT_FOUND)
"""
return t.cast(Y, _filter_or_404(model, False, criteria))
def gen_update_pent_dynamic(
context: PentContext,
obj_id: UUID,
pent_cls_name: str,
data_cls_name: str,
payload_cls_name: str,
data: PentMutationData
) -> PentMutationPayload:
data_cls = context.cls_from_name(data_cls_name)
check.isinst(data, data_cls)
pent_cls = context.cls_from_name(pent_cls_name)
payload_cls = context.cls_from_name(payload_cls_name)
pent = await update_pent(context, pent_cls, obj_id, data)
return cast(PentMutationPayload, payload_cls(pent))
def gen_operation(self, graphql_text: str, operation: str, *args: GraphQLArg) -> dict:
arg_strings = []
for name, arg_type, _value in args:
arg_strings.append("${name}: {arg_type}".format(name=name, arg_type=arg_type))
arg_list = ', '.join(arg_strings)
full_query = (
'{operation} ({arg_list}) '.format(arg_list=arg_list, operation=operation) + '{' +
graphql_text + '}'
)
arg_dict = {arg.name: arg.value for arg in args}
result = await (
exec_in_mem_graphql(
self.graphql_schema, self.context, full_query, self.root_value, arg_dict
)
)
if result.errors:
_process_error(result)
return cast(dict, result.data)
unicode_exporter.py 文件源码
项目:mazes-for-programmers-python-src
作者: Kartones
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def render(self, grid: Grid, **kwargs: Any) -> None:
horizontal_wall = "\u2501"
vertical_wall = "\u2503"
output = self.JUNCTIONS[12]
for x in range(grid.columns - 1):
output += (horizontal_wall * 3 + self.get_topmost_junction(cast(Cell, grid.cell_at(row=0, column=x))))
output += horizontal_wall * 3 + self.JUNCTIONS[10] + "\n"
for row in grid.each_row():
top = vertical_wall
bottom = self.get_leftmost_junction(row[0])
for cell in row:
body = grid.contents_of(cell)
east_boundary = " " if cell.linked_to(cell.east) else vertical_wall
top += body + east_boundary
south_boundary = " " if cell.linked_to(cell.south) else horizontal_wall * 3
bottom += south_boundary + self.get_south_east_junction(cell)
output += top + "\n"
output += bottom + "\n"
print(output)
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
fd = {} # type: FeedDict
sentences = cast(Iterable[List[str]],
dataset.get_series(self.data_id, allow_none=True))
fd[self.train_mode] = train
if sentences is not None:
vectors, paddings = self.vocabulary.sentences_to_tensor(
list(sentences), pad_to_max_len=False, train_mode=train)
fd[self.train_targets] = vectors.T
fd[self.train_weights] = paddings.T
return fd
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
sentences = cast(Iterable[List[str]],
dataset.get_series(self.data_id, allow_none=True))
sentences_list = list(sentences) if sentences is not None else None
fd = {} # type: FeedDict
if sentences is not None:
label_tensors, _ = self.vocabulary.sentences_to_tensor(
sentences_list, self.max_output_len)
# pylint: disable=unsubscriptable-object
fd[self.gt_inputs[0]] = label_tensors[0]
# pylint: enable=unsubscriptable-object
fd[self.train_mode] = train
return fd
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
"""Get paths to series outputs from the dataset keyword argument specs.
Output file for a series named 'xxx' is specified by parameter 's_xxx_out'
Arguments:
series_config: A dictionary containing the dataset keyword argument
specs.
Returns:
A dictionary which maps serie names to the paths for their output
files.
"""
outputs = {}
for key, value in series_config.items():
matcher = SERIES_OUTPUT.match(key)
if matcher:
name = matcher.group(1)
if not isinstance(value, str):
raise ValueError(
"Output path for '{}' series must be a string, was {}.".
format(name, type(value)))
outputs[name] = cast(str, value)
return outputs
def _preprocessed_datasets(
dataset: Dataset,
series_config: SeriesConfig) -> None:
"""Apply dataset-level preprocessing."""
keys = [key for key in series_config.keys()
if PREPROCESSED_SERIES.match(key)]
for key in keys:
name = PREPROCESSED_SERIES.match(key).group(1)
preprocessor = cast(DatasetPreprocess, series_config[key])
if isinstance(dataset, Dataset):
new_series = list(preprocessor(dataset))
dataset.add_series(name, new_series)
elif isinstance(dataset, LazyDataset):
dataset.preprocess_series[name] = (None, preprocessor)
def __init__(self,
output_series: str,
encoder: Stateful,
used_session: int = 0) -> None:
"""Initialize the representation runner.
Args:
output_series: Name of the output seriesi with vectors.
encoder: Used encoder.
used_session: Id of the TensorFlow session used in case of model
ensembles.
"""
check_argument_types()
if not isinstance(encoder, ModelPart):
raise TypeError("The encoder of the representation runner has to "
"be an instance of 'ModelPart'")
BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))
self._used_session = used_session # type: int
self._encoded = encoder.output # type: tf.Tensor
# pylint: disable=unused-argument
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
fd = {} # type: FeedDict
sentences = cast(Iterable[List[str]],
dataset.get_series(self.data_id, allow_none=True))
fd[self.train_mode] = train
if sentences is not None:
vectors, paddings = self.vocabulary.sentences_to_tensor(
list(sentences), pad_to_max_len=False, train_mode=train)
fd[self.train_targets] = vectors.T
fd[self.train_weights] = paddings.T
return fd
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
"""Get paths to series outputs from the dataset keyword argument specs.
Output file for a series named 'xxx' is specified by parameter 's_xxx_out'
Arguments:
series_config: A dictionary containing the dataset keyword argument
specs.
Returns:
A dictionary which maps serie names to the paths for their output
files.
"""
outputs = {}
for key, value in series_config.items():
matcher = SERIES_OUTPUT.match(key)
if matcher:
name = matcher.group(1)
if not isinstance(value, str):
raise ValueError(
"Output path for '{}' series must be a string, was {}.".
format(name, type(value)))
outputs[name] = cast(str, value)
return outputs
def _preprocessed_datasets(
dataset: Dataset,
series_config: SeriesConfig) -> None:
"""Apply dataset-level preprocessing."""
keys = [key for key in series_config.keys()
if PREPROCESSED_SERIES.match(key)]
for key in keys:
name = PREPROCESSED_SERIES.match(key).group(1)
preprocessor = cast(DatasetPreprocess, series_config[key])
if isinstance(dataset, Dataset):
new_series = list(preprocessor(dataset))
dataset.add_series(name, new_series)
elif isinstance(dataset, LazyDataset):
dataset.preprocess_series[name] = (None, preprocessor)
def __init__(self,
output_series: str,
encoder: Stateful,
used_session: int = 0) -> None:
"""Initialize the representation runner.
Args:
output_series: Name of the output seriesi with vectors.
encoder: Used encoder.
used_session: Id of the TensorFlow session used in case of model
ensembles.
"""
check_argument_types()
if not isinstance(encoder, ModelPart):
raise TypeError("The encoder of the representation runner has to "
"be an instance of 'ModelPart'")
BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))
self._used_session = used_session # type: int
self._encoded = encoder.output # type: tf.Tensor
# pylint: disable=unused-argument
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
sentences = cast(Iterable[List[str]],
dataset.get_series(self.data_id, allow_none=True))
sentences_list = list(sentences) if sentences is not None else None
fd = {} # type: FeedDict
if sentences is not None:
label_tensors, _ = self.vocabulary.sentences_to_tensor(
sentences_list, self.max_output_len)
# pylint: disable=unsubscriptable-object
fd[self.gt_inputs[0]] = label_tensors[0]
# pylint: enable=unsubscriptable-object
fd[self.train_mode] = train
return fd
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
"""Get paths to series outputs from the dataset keyword argument specs.
Output file for a series named 'xxx' is specified by parameter 's_xxx_out'
Arguments:
series_config: A dictionary containing the dataset keyword argument
specs.
Returns:
A dictionary which maps serie names to the paths for their output
files.
"""
outputs = {}
for key, value in series_config.items():
matcher = SERIES_OUTPUT.match(key)
if matcher:
name = matcher.group(1)
if not isinstance(value, str):
raise ValueError(
"Output path for '{}' series must be a string, was {}.".
format(name, type(value)))
outputs[name] = cast(str, value)
return outputs
def __init__(self,
output_series: str,
encoder: Stateful,
used_session: int = 0) -> None:
"""Initialize the representation runner.
Args:
output_series: Name of the output seriesi with vectors.
encoder: Used encoder.
used_session: Id of the TensorFlow session used in case of model
ensembles.
"""
check_argument_types()
if not isinstance(encoder, ModelPart):
raise TypeError("The encoder of the representation runner has to "
"be an instance of 'ModelPart'")
BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))
self._used_session = used_session # type: int
self._encoded = encoder.output # type: tf.Tensor
# pylint: disable=unused-argument
def _handle_haves(self, message_id: MessageType, payload: memoryview):
if message_id == MessageType.have:
(index,) = struct.unpack('!I', cast(bytes, payload))
self._mark_as_owner(index)
elif message_id == MessageType.bitfield:
piece_count = self._download_info.piece_count
PeerTCPClient._check_payload_len(message_id, payload, int(ceil(piece_count / 8)))
arr = bitarray(endian='big')
arr.frombytes(payload.tobytes())
for i in range(piece_count):
if arr[i]:
self._mark_as_owner(i)
for i in range(piece_count, len(arr)):
if arr[i]:
raise ValueError('Spare bits in "bitfield" message must be zero')
# if self._download_info.complete and self.is_seed():
# raise SeedError('A seed is disconnected because a download is complete')
def _handle_requests(self, message_id: MessageType, payload: memoryview):
piece_index, begin, length = struct.unpack('!3I', cast(bytes, payload))
request = BlockRequest(piece_index, begin, length)
self._check_position_range(request)
if message_id == MessageType.request:
if length > PeerTCPClient.MAX_REQUEST_LENGTH:
raise ValueError('Requested {} bytes, but the current policy allows to accept requests '
'of not more than {} bytes'.format(length, PeerTCPClient.MAX_REQUEST_LENGTH))
if (self._am_choking or not self._peer_interested or
not self._download_info.pieces[piece_index].downloaded):
# If peer isn't interested but requesting, their peer_interested flag wasn't considered
# when selecting who to unchoke, so we may be not ready to upload to them.
# If requested piece is not downloaded yet, we shouldn't disconnect because our piece_downloaded flag
# could be removed because of file corruption.
return
await self._send_block(request)
await self.drain()
elif message_id == MessageType.cancel:
# Now we answer to a request immediately or reject and forget it,
# so there's no need to handle cancel messages
pass
def add(self, labels: LabelsType, value: NumericValueType) -> None:
''' Add will add the given value to the counter.
:raises: ValueError if the value is negative. Counters can only
increase.
'''
value = cast(Union[float, int], value) # typing check, no runtime behaviour.
if value < 0:
raise ValueError("Counters can't decrease")
try:
current = self.get_value(labels)
except KeyError:
current = 0
current = cast(Union[float, int], current) # typing check, no runtime behaviour.
self.set_value(labels, current + value)
def add(self, labels: LabelsType, value: NumericValueType) -> None:
''' Add adds a single observation to the summary '''
value = cast(Union[float, int], value) # typing check, no runtime behaviour.
if type(value) not in (float, int):
raise TypeError("Summary only works with digits (int, float)")
try:
e = self.get_value(labels)
except KeyError:
# Initialize quantile estimator
e = quantile.Estimator(*self.invariants)
self.set_value(labels, e)
e.observe(float(value)) # type: ignore
# https://prometheus.io/docs/instrumenting/writing_clientlibs/#summary
# A summary MUST have the ``observe`` methods