def dehead_filetree(tree: ExtractFileTree) -> ExtractFileTree:
"""Remove the head of the given filetree while preserving the old head
name.
So a tree ``{1: [2: [3: [4: [f1, f2]]]}`` will be converted to
``{1: [f1, f2]}``.
:param dict tree: The file tree as generated by :py:func:`extract`.
:returns: The same tree but deheaded as described.
:rtype: dict
"""
assert len(tree) == 1
head_node = list(tree.keys())[0]
head = tree[head_node]
while (
isinstance(head, t.MutableSequence) and len(head) == 1 and
isinstance(head[0], t.MutableMapping) and len(head[0]) == 1
):
head = list(head[0].values())[0]
tree[head_node] = head
return tree
python类MutableSequence()的实例源码
def get_grade_history(submission_id: int
) -> JSONResponse[t.Sequence[models.GradeHistory]]:
"""Get the grade history for the given submission.
.. :quickref: Submission; Get the grade history for the given submission.
:returns: A list of :class:`.models.GradeHistory` object serialized to
json for the given assignment.
:raises PermissionException: If the current user has no permission to see
the grade history. (INCORRECT_PERMISSION)
"""
work = helpers.get_or_404(models.Work, submission_id)
auth.ensure_permission('can_see_grade_history', work.assignment.course_id)
hist: t.MutableSequence[models.GradeHistory]
hist = db.session.query(
models.GradeHistory
).filter_by(work_id=work.id).order_by(
models.GradeHistory.changed_at.desc(), # type: ignore
).all()
return jsonify(hist)
def address_from_digest(digest):
# type: (Digest) -> Address
"""
Generates an address from a private key digest.
"""
address_trits = [0] * (Address.LEN * TRITS_PER_TRYTE) # type: MutableSequence[int]
sponge = Kerl()
sponge.absorb(digest.as_trits())
sponge.squeeze(address_trits)
return Address.from_trits(
trits = address_trits,
key_index = digest.key_index,
security_level = digest.security_level,
)
def __produce_commands(self, install):
# type: (After4xCollectionInstaller) -> Sequence[str]
args = [] # type: MutableSequence[str]
mem = install.puppetserver_jvm_memory()
jvmargs = install.puppetserver_jvm_args()
if mem.is_set():
if mem.heap_minimum() is not None:
args.append('-Xms{xms}'.format(xms=mem.heap_minimum()))
if mem.heap_maximum() is not None:
args.append('-Xmx{xmx}'.format(xmx=mem.heap_maximum()))
java_version = Facter.get(JavaVersion)
metaspace_arg = self.__get_metaspace_arg(java_version)
if java_version.has_permgen_space() and mem.metaspace_maximum() is None:
args.append(metaspace_arg.format(mspace='256m'))
elif mem.metaspace_maximum() is not None:
args.append(metaspace_arg.format(mspace=mem.metaspace_maximum()))
if jvmargs.are_set():
args.extend(jvmargs)
args_as_str = ' '.join(args)
collector = self._collector()
mapping = dict(jvmargs=args_as_str)
collector.collect_from_template('Configuring PuppetServer JVM Args',
'puppetserver-jvmargs.pp',
mapping,
format=ScriptFormat.PUPPET)
return collector.lines()
def segments(self):
# type: () -> MutableSequence[int|str]
return list(self.__get_segments())
def _to_jsonschema(type_):
if isinstance(type_, marshmallow.Schema):
return _jsonschema.dump_schema(type_)
elif type_ in six.integer_types:
return {'type': 'number', 'format': 'integer'}
elif type_ == float:
return {'type': 'number', 'format': 'float'}
elif type_ == decimal.Decimal:
return {'type': 'string', 'format': 'decimal'}
elif type_ == uuid.UUID:
return {'type': 'string', 'format': 'uuid'}
elif type_ == datetime.datetime:
return {'type': 'string', 'format': 'date-time'}
elif type_ == datetime.date:
return {'type': 'string', 'format': 'date'}
elif type_ == datetime.time:
return {'type': 'string', 'format': 'time'}
elif type_ == dict:
return {'type': 'object'}
elif type_ == six.text_type or type_ == six.binary_type:
return {'type': 'string'}
elif type_ is None:
return {'type': 'null'}
elif type_ == list:
return {'type': 'array'}
elif type_ == bool:
return {'type': 'boolean'}
elif issubclass(type_, typing.MutableSequence[typing.T]):
items_type = type_.__parameters__[0]
if issubclass(items_type, marshmallow.Schema):
items_type = items_type()
return {
'type': 'array',
'items': _to_jsonschema(items_type),
}
else:
raise ValueError('unsupported return type: %s' % type_)
def __init__(self, random_jobs: MutableSequence[JobInterface]):
self._jobs = {job.id: job for job in random_jobs}
def squeeze(self, trits):
# type: (MutableSequence[int]) -> None
"""
Squeeze trits from the sponge.
:param trits:
Sequence that the squeezed trits will be copied to.
Note: this object will be modified!
"""
#
# Squeeze is kind of like the opposite of absorb; it copies trits
# from internal state to the ``trits`` parameter, one hash at a
# time, and transforming internal state in between hashes.
#
# However, only the first hash of the state is "public", so we
# can simplify the implementation somewhat.
#
# Ensure that ``trits`` can hold at least one hash worth of trits.
trits.extend([0] * max(0, HASH_LENGTH - len(trits)))
# Copy exactly one hash.
trits[0:HASH_LENGTH] = self._state[0:HASH_LENGTH]
# One hash worth of trits copied; now transform.
self._transform()
def __next__(self):
# type: () -> TryteString
"""
Returns the next signature fragment.
"""
key_trytes = next(self._key_chunks) # type: TryteString
self._iteration += 1
# If the key is long enough, loop back around to the start.
normalized_chunk =\
self._normalized_hash[self._iteration % len(self._normalized_hash)]
signature_fragment = key_trytes.as_trits()
# Build the signature, one hash at a time.
for i in range(key_trytes.count_chunks(Hash.LEN)):
hash_start = i * HASH_LENGTH
hash_end = hash_start + HASH_LENGTH
buffer = signature_fragment[hash_start:hash_end] # type: MutableSequence[int]
for _ in range(13 - normalized_chunk[i]):
self._sponge.reset()
self._sponge.absorb(buffer)
self._sponge.squeeze(buffer)
signature_fragment[hash_start:hash_end] = buffer
return TryteString.from_trits(signature_fragment)
def __init__(
self,
template_map: typing.MutableSequence[typing.Tuple[str, int]]=None,
) -> None:
self.template_map = list() if template_map is None else template_map
def we_can_has_sequence(p, q, r, s, t, u):
"""
:type p: typing.Sequence[int]
:type q: typing.Sequence[B]
:type r: typing.Sequence[int]
:type s: typing.Sequence["int"]
:type t: typing.MutableSequence[dict]
:type u: typing.List[float]
"""
#? ["count"]
p.c
#? int()
p[1]
#? ["count"]
q.c
#? B()
q[1]
#? ["count"]
r.c
#? int()
r[1]
#? ["count"]
s.c
#? int()
s[1]
#? []
s.a
#? ["append"]
t.a
#? dict()
t[1]
#? ["append"]
u.a
#? float()
u[1]
def __init__(self, feedstate_dict: Mapping[str, Any]=None) -> None:
if feedstate_dict is not None:
LOG.debug("Successfully loaded feed state dict.")
self.feed = feedstate_dict.get("feed", {})
self.entries = feedstate_dict.get("entries", [])
self.entries_state_dict = feedstate_dict.get("entries_state_dict", {})
self.queue = collections.deque(feedstate_dict.get("queue", []))
# Store the most recent SUMMARY_LIMIT items we've downloaded.
temp_list = feedstate_dict.get("summary_queue", [])
self.summary_queue: MutableSequence[Dict[str, Any]] = collections.deque(
[],
SUMMARY_LIMIT,
)
# When we load from the cache file, mark all of the items in the summary queue as not
# being from the current session.
for elem in temp_list:
elem["is_this_session"] = False
self.summary_queue.append(elem)
last_modified = feedstate_dict.get("last_modified", None)
self.store_last_modified(last_modified)
self.etag = feedstate_dict.get("etag", None)
self.latest_entry_number = feedstate_dict.get("latest_entry_number", None)
else:
LOG.debug("Did not successfully load feed state dict.")
LOG.debug("Creating blank dict.")
self.feed = {}
self.entries = []
self.entries_state_dict = {}
self.queue = collections.deque([])
self.summary_queue = collections.deque([], SUMMARY_LIMIT)
self.last_modified: Any = None
self.etag: str = None
self.latest_entry_number = None
def absorb(self, trits, offset=0, length=None):
# type: (MutableSequence[int], int, Optional[int]) -> None
"""
Absorb trits into the sponge from a buffer.
:param trits:
Buffer that contains the trits to absorb.
:param offset:
Starting offset in ``trits``.
:param length:
Number of trits to absorb. Defaults to ``len(trits)``.
"""
# Pad input if necessary, so that it can be divided evenly into
# hashes.
# Note that this operation creates a COPY of ``trits``; the
# incoming buffer is not modified!
pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
trits += [0] * (TRIT_HASH_LENGTH - pad)
if length is None:
length = len(trits)
if length < 1:
raise with_context(
exc = ValueError('Invalid length passed to ``absorb``.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
},
)
while offset < length:
stop = min(offset + TRIT_HASH_LENGTH, length)
# If we're copying over a full chunk, zero last trit
if stop - offset == TRIT_HASH_LENGTH:
trits[stop - 1] = 0
signed_nums = conv.convertToBytes(trits[offset:stop])
# Convert signed bytes into their equivalent unsigned representation
# In order to use Python's built-in bytes type
unsigned_bytes = bytearray(conv.convert_sign(b) for b in signed_nums)
self.k.update(unsigned_bytes)
offset += TRIT_HASH_LENGTH
def squeeze(self, trits, offset=0, length=None):
# type: (MutableSequence[int], int, Optional[int]) -> None
"""
Squeeze trits from the sponge into a buffer.
:param trits:
Buffer that will hold the squeezed trits.
IMPORTANT: If ``trits`` is too small, it will be extended!
:param offset:
Starting offset in ``trits``.
:param length:
Number of trits to squeeze from the sponge.
If not specified, defaults to :py:data:`TRIT_HASH_LENGTH` (i.e.,
by default, we will try to squeeze exactly 1 hash).
"""
# Pad input if necessary, so that it can be divided evenly into
# hashes.
pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
trits += [0] * (TRIT_HASH_LENGTH - pad)
if length is None:
# By default, we will try to squeeze one hash.
# Note that this is different than ``absorb``.
length = len(trits) or TRIT_HASH_LENGTH
if length < 1:
raise with_context(
exc = ValueError('Invalid length passed to ``squeeze``.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
},
)
while offset < length:
unsigned_hash = self.k.digest()
if PY2:
unsigned_hash = map(ord, unsigned_hash) # type: ignore
signed_hash = [conv.convert_sign(b) for b in unsigned_hash]
trits_from_hash = conv.convertToTrits(signed_hash)
trits_from_hash[TRIT_HASH_LENGTH - 1] = 0
stop = min(TRIT_HASH_LENGTH, length-offset)
trits[offset:offset+stop] = trits_from_hash[0:stop]
flipped_bytes = bytearray(conv.convert_sign(~b) for b in unsigned_hash)
# Reset internal state before feeding back in
self.reset()
self.k.update(flipped_bytes)
offset += TRIT_HASH_LENGTH
def get_digest(self):
# type: () -> Digest
"""
Generates the digest used to do the actual signing.
Signing keys can have variable length and tend to be quite long,
which makes them not-well-suited for use in crypto algorithms.
The digest is essentially the result of running the signing key
through a PBKDF, yielding a constant-length hash that can be used
for crypto.
"""
hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN
key_fragments = self.iter_chunks(FRAGMENT_LENGTH)
# The digest will contain one hash per key fragment.
digest = [0] * HASH_LENGTH * len(key_fragments)
# Iterate over each fragment in the key.
for (i, fragment) in enumerate(key_fragments): # type: Tuple[int, TryteString]
fragment_trits = fragment.as_trits()
key_fragment = [0] * FRAGMENT_LENGTH
hash_trits = []
# Within each fragment, iterate over one hash at a time.
for j in range(hashes_per_fragment):
hash_start = j * HASH_LENGTH
hash_end = hash_start + HASH_LENGTH
hash_trits = fragment_trits[hash_start:hash_end] # type: MutableSequence[int]
for k in range(26):
sponge = Kerl()
sponge.absorb(hash_trits)
sponge.squeeze(hash_trits)
key_fragment[hash_start:hash_end] = hash_trits
#
# After processing all of the hashes in the fragment, generate a
# final hash and append it to the digest.
#
# Note that we will do this once per fragment in the key, so the
# longer the key is, the longer the digest will be.
#
sponge = Kerl()
sponge.absorb(key_fragment)
sponge.squeeze(hash_trits)
fragment_start = i * FRAGMENT_LENGTH
fragment_end = fragment_start + FRAGMENT_LENGTH
digest[fragment_start:fragment_end] = hash_trits
return Digest(TryteString.from_trits(digest), self.key_index)