def iterchars(text):
# type: (str) -> Sequence[str]
idx = 0
chars = []
while idx < len(text):
c = text[idx]
if ord(c) >= 0x100:
highchar = True
if ((0xD800 <= ord(c) <= 0xDBFF) and (idx < len(text) - 1) and
(0xDC00 <= ord(text[idx + 1]) <= 0xDFFF)):
c = text[idx:idx + 2]
# Skip the other half of the lead and trail surrogate
idx += 1
else:
highchar = False
idx += 1
# Add every character except only one half of a surrogate pair.
if not (highchar and len(c) == 1 and 0xD800 <= ord(c) <= 0xDFFF):
chars.append(c)
return chars
python类Sequence()的实例源码
def mixtohash(self,
args=(), # type: Sequence[AnyStr]
exe=None, # type: Optional[str]
depfiles=(), # type: Sequence[str]
hashobj=None # type: Optional[Any]
):
# type: (...) -> Any
if hashobj is None:
hashobj = HASHFUNC()
for filename in depfiles:
hashobj.update(sysfilename(filename))
hashobj.update(filesha(filename))
hashobj.update(b'\x00')
for arg in args:
hashobj.update(sysfilename(arg))
hashobj.update(b'\x00')
if exe is not None:
hashobj.update(self.digest_for_exe(exe))
return hashobj
def apply(func, # type: Callable[..., bytes]
args=(), # type: Sequence[AnyStr]
exe=None, # type: Optional[str]
depfiles=(), # type: Sequence[str]
cache=None # type: Optional[Cache]
):
"""Applies func(*args) when the result is not present in the cache.
The result of func(*args) must be bytes and must not be None which is used as
cache-miss indicator. After evaluation of func the result is stored in the cache.
"""
key, value = None, None
if cache is not None:
hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles)
key = hashobj.hexdigest()
value = cache.get(key)
if value is None:
value = func(*args)
if key is not None:
cache.set(key, value)
return value
def update_evaluations(formatter, # type: CodeFormatter
evaluations, # type: List[AttemptResult]
finished_styles, # type: List[AttemptResult]
bestdist # type: Sequence[int]
):
# type: (...) -> Tuple[bool, bool, Sequence[int]]
attemptresult = heapq.heappop(evaluations)
nested_round = False
if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
bestdist = attemptresult.distance
heapq.heappush(evaluations, attemptresult)
else:
# We found a style that could no longer be improved by adding a single option value.
heapq.heappush(finished_styles, attemptresult)
nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
if not nested_styles:
# This formatstyle does not unlock more options.
return True, nested_round, bestdist
# Restart the optimization from scratch with the attemptresult augmented with
# every nested option as seed styles.
bestdist = None
ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
nested_round = True
return False, nested_round, bestdist
def to_text(cls, records: Sequence['TransactionSpecification']) -> str:
"""Get a text string from a sequence of specification records."""
if len(records) > cls._MAX_RECORDS:
raise ValueError(
'Max {} specification records allowed, got {}'
.format(cls._MAX_RECORDS, len(records)))
tuples = sorted([
(r.line_number, r.column_number, r)
for r in records
])
text = ''
for _, column, specification in tuples:
text += specification.text
if column == cls._MAX_COLUMNS:
text += '\n'
return text
def from_dataset(cls,
dataset,
min_count: int = 1,
max_vocab_size: Union[int, Dict[str, int]] = None,
non_padded_namespaces: Sequence[str] = DEFAULT_NON_PADDED_NAMESPACES,
pretrained_files: Optional[Dict[str, str]] = None,
only_include_pretrained_words: bool = False) -> 'Vocabulary':
"""
Constructs a vocabulary given a :class:`.Dataset` and some parameters. We count all of the
vocabulary items in the dataset, then pass those counts, and the other parameters, to
:func:`__init__`. See that method for a description of what the other parameters do.
"""
logger.info("Fitting token dictionary from dataset.")
namespace_token_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
for instance in tqdm.tqdm(dataset.instances):
instance.count_vocab_items(namespace_token_counts)
return Vocabulary(counter=namespace_token_counts,
min_count=min_count,
max_vocab_size=max_vocab_size,
non_padded_namespaces=non_padded_namespaces,
pretrained_files=pretrained_files,
only_include_pretrained_words=only_include_pretrained_words)
def parse(self, source: Sequence[Input]) -> Result[Output]:
"""Abstract method for completely parsing a source.
While ``parse`` is a method on every parser for convenience, it
is really a function of the context. It is the duty of the context
to set the correct ``Reader`` to use and to handle whitespace
not handled by the parsers themselves. This method is pulled from the
context when the parser is initialized.
Args:
source: What will be parsed.
Returns:
If the parser succeeded in matching and consumed the entire output,
the value from ``Continue`` is copied to make a ``Success``. If the
parser failed in matching, the error message is copied to a
``Failure``. If the parser succeeded but the source was not
completelt consumed, a ``Failure`` with a message indicating this
is returned.
"""
raise NotImplementedError()
def lit(literal: Sequence[Input], *literals: Sequence[Sequence[Input]]) -> Parser:
"""Match a literal sequence.
In the `TextParsers`` context, this matches the literal string
provided. In the ``GeneralParsers`` context, this matches a sequence of
input.
If multiple literals are provided, they are treated as alternatives. e.g.
``lit('+', '-')`` is the same as ``lit('+') | lit('-')``.
Args:
literal: A literal to match
*literals: Alternative literals to match
Returns:
A ``LiteralParser`` in the ``GeneralContext``, a ``LiteralStringParser``
in the ``TextParsers`` context, and an ``AlternativeParser`` if multiple
arguments are provided.
"""
if len(literals) > 0:
return AlternativeParser(options.handle_literal(literal), *map(options.handle_literal, literals))
else:
return options.handle_literal(literal)
def rep1sep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \
-> RepeatedOnceSeparatedParser:
"""Match a parser one or more times separated by another parser.
This matches repeated sequences of ``parser`` separated by ``separator``.
If there is at least one match, a list containing the values of the
``parser`` matches is returned. The values from ``separator`` are discarded.
If it does not match ``parser`` at all, it fails.
Args:
parser: Parser or literal
separator: Parser or literal
"""
if isinstance(parser, str):
parser = lit(parser)
if isinstance(separator, str):
separator = lit(separator)
return RepeatedOnceSeparatedParser(parser, separator)
def repsep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \
-> RepeatedSeparatedParser:
"""Match a parser zero or more times separated by another parser.
This matches repeated sequences of ``parser`` separated by ``separator``. A
list is returned containing the value from each match of ``parser``. The
values from ``separator`` are discarded. If there are no matches, an empty
list is returned.
Args:
parser: Parser or literal
separator: Parser or literal
"""
if isinstance(parser, str):
parser = lit(parser)
if isinstance(separator, str):
separator = lit(separator)
return RepeatedSeparatedParser(parser, separator)
def ensure_unique_string(preferred_string: str, current_strings:
Union[Sequence[str], KeysView[str]]) -> str:
"""Return a string that is not present in current_strings.
If preferred string exists will append _2, _3, ..
"""
test_string = preferred_string
current_strings_set = set(current_strings)
tries = 1
while test_string in current_strings_set:
tries += 1
test_string = "{}_{}".format(preferred_string, tries)
return test_string
# Taken from: http://stackoverflow.com/a/11735897
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None,
colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs):
if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)):
return
# also allow passing a single string
if not type(arg1) is str:
allArgs = arg1
arg1 = allArgs[0]
remainingArgs = allArgs[1:]
newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple()
if env:
# only print the changed environment entries
filteredEnv = __filterEnv(env)
if filteredEnv:
newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items())))
# comma in tuple is required otherwise it creates a tuple of string chars
newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs)))
if outputFile:
newArgs += (">", str(outputFile))
print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
def getInterpreter(cmdline: "typing.Sequence[str]") -> "typing.Optional[typing.List[str]]":
"""
:param cmdline: The command to check
:return: The interpreter command if the executable does not have execute permissions
"""
executable = Path(cmdline[0])
print(executable, os.access(str(executable), os.X_OK), cmdline)
if not executable.exists():
executable = Path(shutil.which(str(executable)))
statusUpdate(executable, "is not executable, looking for shebang:", end=" ")
with executable.open("r", encoding="utf-8") as f:
firstLine = f.readline()
if firstLine.startswith("#!"):
interpreter = shlex.split(firstLine[2:])
statusUpdate("Will run", executable, "using", interpreter)
return interpreter
else:
statusUpdate("No shebang found.")
return None
def multimask_images(images: Iterable[SpatialImage],
masks: Sequence[np.ndarray], image_type: type = None
) -> Iterable[Sequence[np.ndarray]]:
"""Mask images with multiple masks.
Parameters
----------
images:
Images to mask.
masks:
Masks to apply.
image_type:
Type to cast images to.
Yields
------
Sequence[np.ndarray]
For each mask, a masked image.
"""
for image in images:
yield [mask_image(image, mask, image_type) for mask in masks]
def __init__(self, pattern, keys=None):
# type: (Union[Text, regex._pattern_type, re._pattern_type], Optional[Sequence[Text]]) -> None
"""
:param pattern:
Regex used to split incoming string values.
IMPORTANT: If you specify your own compiled regex, be sure
to add the ``UNICODE`` flag for Unicode support!
:param keys:
If set, the resulting list will be converted into an
OrderedDict, using the specified keys.
IMPORTANT: If ``keys`` is set, the split value's length
must be less than or equal to ``len(keys)``.
"""
super(Split, self).__init__()
self.regex = (
pattern
if isinstance(pattern, (regex._pattern_type, re._pattern_type))
else regex.compile(pattern, regex.UNICODE)
)
self.keys = keys
def sorted_dict(value):
# type: (Mapping) -> Any
"""
Sorts a dict's keys to avoid leaking information about the
backend's handling of unordered dicts.
"""
if isinstance(value, Mapping):
return OrderedDict(
(key, sorted_dict(value[key]))
for key in sorted(iterkeys(value))
)
elif isinstance(value, Sequence) and not isinstance(value, string_types):
return list(map(sorted_dict, value))
else:
return value
def __call__(self, parser: configargparse.ArgumentParser, namespace: configargparse.Namespace,
values: Sequence[str], option_string: str=None) -> None:
if len(values) > 2:
raise ErrorMessage("%s takes 1 or 2 arguments, not more." % highlight("--init"))
if values[0] not in _initializers:
raise ErrorMessage("Unknown initializer \"%s\". Acceptable values are: %s" %
(highlight(values[0]), highlight(", ".join(_initializers.keys()))))
initializer = _initializers[values[0]]
if len(values) > 1:
initializer.configfolder = values[1] # override config folder if it's not the default
if os.path.exists(initializer.configfolder):
raise ErrorMessage("%s already exists. If you want to overwrite it, remove it first." %
initializer.configfolder)
initializer.build_config()
parser.exit(0)
def generate_future_versions(self, artifact_names: Sequence[str], base_version: VersionContainer,
action: str,
args: configargparse.Namespace) -> Union[Dict[str, VersionContainer], None]:
"""
Takes a list of unique artifact identifiers (e.g. package names) which *will be created by a Packer during the
build later* and returns a dict mapping of identifier to version for the Packer to be used during the build
or ``None`` if the store can't generate future versions. The store should use ``action`` to generate the
version strings for all artifacts.
:param artifact_names: a list of artifact identifiers
:param base_version: the base version from which to generate future versions
:param action: the version action selected by the user to generate future versions
:param args: command-line parameters
:return: a mapping of artifact identifiers to version information
"""
raise NotImplementedError("Each subclass of BaseStore MUST implement generate_future_versions")
def __missing__(self, key: t.Union[t.Tuple[int, type], t.Tuple[int, type, t.Sequence[int]]]):
if not isinstance(key, tuple):
raise TypeError('key={} of bad type {} was given'.format(repr(key), type(key)))
if len(key) < 2 or len(key) > 3:
raise ValueError('{}'.format(key))
if not isinstance(key[0], int):
raise TypeError()
if key[0] <= 0:
raise ValueError()
if not isinstance(key[1], type):
raise TypeError()
if len(key) == 3:
if not isinstance(key[2], tuple):
raise TypeError()
if len(key[2]) != key[0]:
raise ValueError()
if any(k is not Ellipsis and not isinstance(k, int) for k in key[2]):
raise TypeError()
if any(k is not Ellipsis and k <= 0 for k in key[2]):
raise ValueError()
value = create_typed_numpy_ndarray(*key)
self[key] = value
return value
def partition_version_classifiers(
classifiers: t.Sequence[str], version_prefix: str = 'Programming Language :: Python :: ',
only_suffix: str = ' :: Only') -> t.Tuple[t.List[str], t.List[str]]:
"""Find version number classifiers in given list and partition them into 2 groups."""
versions_min, versions_only = [], []
for classifier in classifiers:
version = classifier.replace(version_prefix, '')
versions = versions_min
if version.endswith(only_suffix):
version = version.replace(only_suffix, '')
versions = versions_only
try:
versions.append(tuple([int(_) for _ in version.split('.')]))
except ValueError:
pass
return versions_min, versions_only
def split_path(path: str) -> t.Tuple[t.Sequence[str], bool]:
"""Split a path into an array of parts of a path.
This functions splits a forward slash separated path into an sequence of
the directories of this path. If the given path ends with a '/' it returns
that the given path ends with an directory, otherwise the last part is a
file, this information is returned as the last part of the returned tuple.
The given path may contain multiple consecutive forward slashes, these are
interpreted as a single slash. A leading forward slash is also optional.
:param path: The forward slash separated path to split.
:returns: A tuple where the first item is the splitted path and the second
item is a boolean indicating if the last item of the given path was a
directory.
"""
is_dir = path[-1] == '/'
patharr = [item for item in path.split('/') if item]
return patharr, is_dir
def _filter_or_404(model: t.Type[Y], get_all: bool,
criteria: t.Tuple) -> t.Union[Y, t.Sequence[Y]]:
"""Get the specified object by filtering or raise an exception.
:param get_all: Get all objects if ``True`` else get a single one.
:param model: The object to get.
:param criteria: The criteria to filter with.
:returns: The requested object.
:raises APIException: If no object with the given id could be found.
(OBJECT_ID_NOT_FOUND)
"""
crit_str = ' AND '.join(str(crit) for crit in criteria)
query = model.query.filter(*criteria) # type: ignore
obj = query.all() if get_all else query.one_or_none()
if not obj:
raise psef.errors.APIException(
f'The requested {model.__name__.lower()} was not found',
f'There is no "{model.__name__}" when filtering with {crit_str}',
psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404
)
return obj
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]:
"""Get all objects of the specified model filtered by the specified
criteria.
.. note::
``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
SQLAlchemy model.
:param model: The object to get.
:param criteria: The criteria to filter with.
:returns: The requested objects.
:raises APIException: If no object with the given id could be found.
(OBJECT_ID_NOT_FOUND)
"""
return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
def get_all_permissions(self) -> t.Mapping[str, bool]:
"""Get all course :class:`permissions` for this course role.
:returns: A name boolean mapping where the name is the name of the
permission and the value indicates if this user has this
permission.
"""
perms: t.Sequence[Permission] = (
Permission.query.
filter_by( # type: ignore
course_permission=True
).all()
)
result: t.MutableMapping[str, bool] = {}
for perm in perms:
if perm.name in self._permissions:
result[perm.name] = not perm.default_value
else:
result[perm.name] = perm.default_value
return result
def get_all_permissions(self) -> t.Mapping[str, bool]:
"""Get all course permissions (:class:`Permission`) for this role.
:returns: A name boolean mapping where the name is the name of the
permission and the value indicates if this user has this
permission.
"""
perms: t.Sequence[Permission] = (
Permission.query.
filter_by( # type: ignore
course_permission=False
).all()
)
result: t.MutableMapping[str, bool] = {}
for perm in perms:
if perm.name in self._permissions:
result[perm.name] = not perm.default_value
else:
result[perm.name] = perm.default_value
return result
def get_all_permissions(self, course_id: t.Union['Course', int] = None
) -> t.Mapping[str, bool]:
"""Get all global permissions (:class:`Permission`) of this user or all
course permissions of the user in a specific :class:`Course`.
:param course_id: The course or course id
:returns: A name boolean mapping where the name is the name of the
permission and the value indicates if this user has this
permission.
"""
if isinstance(course_id, Course):
course_id = course_id.id
if course_id is None:
return self.role.get_all_permissions()
elif course_id in self.courses:
return self.courses[course_id].get_all_permissions()
else:
perms: t.Sequence[Permission]
perms = Permission.query.filter_by( # type: ignore
course_permission=True).all()
return {perm.name: False for perm in perms}
def get_grade_history(submission_id: int
) -> JSONResponse[t.Sequence[models.GradeHistory]]:
"""Get the grade history for the given submission.
.. :quickref: Submission; Get the grade history for the given submission.
:returns: A list of :class:`.models.GradeHistory` object serialized to
json for the given assignment.
:raises PermissionException: If the current user has no permission to see
the grade history. (INCORRECT_PERMISSION)
"""
work = helpers.get_or_404(models.Work, submission_id)
auth.ensure_permission('can_see_grade_history', work.assignment.course_id)
hist: t.MutableSequence[models.GradeHistory]
hist = db.session.query(
models.GradeHistory
).filter_by(work_id=work.id).order_by(
models.GradeHistory.changed_at.desc(), # type: ignore
).all()
return jsonify(hist)
def combine_notebooks(notebook_files: Sequence[Path]) -> NotebookNode:
combined_nb = new_notebook()
count = 0
for filename in notebook_files:
count += 1
log.debug('Adding notebook: %s', filename)
nbname = filename.stem
nb = nbformat.read(str(filename), as_version=4)
try:
combined_nb.cells.extend(add_sec_label(nb.cells[0], nbname))
except NoHeader:
raise NoHeader("Failed to find header in " + filename)
combined_nb.cells.extend(nb.cells[1:])
if not combined_nb.metadata:
combined_nb.metadata = nb.metadata.copy()
log.info('Combined %d files' % count)
return combined_nb
def main(args: Sequence[str] = None) -> None:
nfo('Starting')
try:
if args is None:
args = sys.argv[1:]
args = parse_args(args)
setup_logging(args.verbose)
dbg(f'Invoked with args: {args}')
transformation = get_transformation(args.transformation)
document = parse_file(args)
copy_file(args.target, args.target + '.orig')
dbg("Saved document backup with suffix '.orig'")
dbg('Applying transformation.')
document._setroot(transformation(document.getroot()))
write_file(document, args)
except Exception:
print_exc()
raise SystemExit(2)
def create_masters_in_db(self, lib_name, content_list, debug=False):
# type: (str, Sequence[Any], bool) -> None
"""Create the masters in the design database.
Parameters
----------
lib_name : str
library to create the designs in.
content_list : Sequence[Any]
a list of the master contents. Must be created in this order.
debug : bool
True to print debug messages
"""
if self._prj is None:
raise ValueError('BagProject is not defined.')
self._prj.instantiate_schematic(lib_name, content_list, lib_path=self.lib_path)