def get_diff_with_color(expected: str, ans: str) -> Tuple[str, str]:
d = difflib.Differ()
diff = d.compare(expected, ans)
expected_with_mistake = ""
ans_with_mistake = ""
for e in diff:
if e.startswith("+"):
ans_with_mistake += colored(e[-1], "red")
elif e.startswith("-"):
expected_with_mistake += colored(e[-1], "green")
else:
expected_with_mistake += e[-1]
ans_with_mistake += e[-1]
return expected_with_mistake, ans_with_mistake
python类Tuple()的实例源码
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
"""
Get a processor instance. The class and buffers will be selected based on the
python_driver.ProcessorConfigs dictionary. The input and output buffers can
be overriden using the custom_inbuffer and custom_outbuffer parameters. This
is mainly useful for unittesting.
"""
conf = ProcessorConfigs.get(format_)
if not conf:
raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)
inbuffer = custom_inbuffer if custom_inbuffer else conf['inbuffer']
outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
instance = conf['class'](outbuffer) # type: ignore
return instance, inbuffer
def __init__(
self,
description: str = None,
pre_hooks: (List, Tuple) = None,
post_hooks: (List, Tuple) = None
):
self.result = None
self.total = None
self.success = None
self.errors = None
self.params = None
self.output = None
self.pagination = None
self.limit = None
self.offset = None
self.app = None
self.settings = None
self.description = description
self.pre_hooks = pre_hooks
self.post_hooks = post_hooks
self.meta = {}
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]:
n_total_words = 0
word_freq = {}
with open(self.rnnlm_model_path, mode='r') as f:
for line in f:
n_total_words += 1
word, freq = line.split(' ')
freq = int(freq)
if freq > threshold:
word_freq[word] = freq
else:
word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1
return (word_freq, n_total_words)
def extra_penalty(self, style, complexity):
# type: (Style, int) -> Tuple[int, int]
"""Trying longer and longer column limits
without getting better results should be penalized to speed
up the search.
"""
standards = {'ColumnLimit': 80,
'MaxEmptyLinesToKeep': 2, }
penalty = 0
for optionname, value in standards.items():
fvalue = style.get(optionname, value)
if fvalue is not None and fvalue > value:
penalty += fvalue - value
if style.get('BreakBeforeBraces') == 'Custom':
# Rate a commonly known brace breaking style
# better than an equally performing custom style.
penalty += 1
# We would prefer an equally performing style even if we had to
# add another 12 options.
complexity += 12
return complexity, penalty
def mget(self, keys):
# type: (List[str]) -> List[Optional[bytes]]
if not keys:
return []
cached = []
uncached = [] # type: List[Tuple[int, Optional[bytes]]]
contentkeys = super(DedupKeyValueStore, self).mget(keys)
for idx, contentkey in enumerate(contentkeys):
if contentkey is None:
uncached.append((idx, None))
else:
sha = binary_type(contentkey)
cached.append((idx, unistr(sha)))
if not cached:
return [None for _, contentkey in uncached]
indices, existing_keys = zip(*cached)
existing_values = self.kvstore.mget(existing_keys)
idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
return list([value for _, value in idx_value_pairs])
def split_reffiles(references, filenames):
# type: (bool, List[str]) -> Tuple[List[str], List[str]]
"""Splits [file1, reffile1, file2, reffile2] into [file1, file2], [reffile1, reffile2]
when references is True.
When references is False returns the pair (filenames, filenames).
"""
if not references:
return filenames, filenames
assert len(filenames) % 2 == 0
files = []
refs = []
for filename, reffilename in grouper(2, filenames):
files.append(filename)
refs.append(reffilename)
return files, refs
# ----------------------------------------------------------------------
# Functions to convert ANSI text into HTML.
def update_evaluations(formatter, # type: CodeFormatter
evaluations, # type: List[AttemptResult]
finished_styles, # type: List[AttemptResult]
bestdist # type: Sequence[int]
):
# type: (...) -> Tuple[bool, bool, Sequence[int]]
attemptresult = heapq.heappop(evaluations)
nested_round = False
if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
bestdist = attemptresult.distance
heapq.heappush(evaluations, attemptresult)
else:
# We found a style that could no longer be improved by adding a single option value.
heapq.heappush(finished_styles, attemptresult)
nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
if not nested_styles:
# This formatstyle does not unlock more options.
return True, nested_round, bestdist
# Restart the optimization from scratch with the attemptresult augmented with
# every nested option as seed styles.
bestdist = None
ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
nested_round = True
return False, nested_round, bestdist
def avg_linelength_diffs(diffargs):
# type: (List[Tuple[str, bytes]]) -> Iterable[int]
"""Returns the nudged absolute line length differences.
"""
for filename1, content2 in diffargs:
linelen1 = get_num_lines(filename1)
filelen1 = len(get_cached_file(filename1))
avg1 = 0.0
if linelen1 > 0:
avg1 = float(filelen1) / linelen1
linelen2 = count_content_lines(content2)
filelen2 = len(content2)
avg2 = 0.0
if linelen2 > 0:
avg2 = float(filelen2) / linelen2
yield int(abs(10000.0 * (avg1 - avg2)))
def unified_diff(filename, content2=None):
# type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
"""This function prints a unified diff of the contents of
filename and the standard input, when used from the command line
as follows:
echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
We get this result:
---
+++
@@ -1 +1 @@
-123
+456
"""
use_stdin = content2 is None
if content2 is None:
# Read binary input stream
stdin = rawstream(sys.stdin)
econtent2 = bytestr(stdin.read())
else:
econtent2 = content2
exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
if use_stdin:
write('\n'.join(diff))
return exit_code, diff
def compute_unified_diff(filename, content2, **kwargs):
# type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
diff = () # type: Iterable[str]
exit_code = ERROR
kw = kwargs.copy()
if 'n' not in kwargs:
# zero context lines
kw['n'] = 0
try:
content1 = get_cached_file(filename)
if PY3:
c1 = unistr(content1)
c2 = unistr(content2)
else:
c1 = content1
c2 = content2
diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
exit_code = OK
finally:
return exit_code, diff
# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
def bin_stats(predictions: tf.Tensor, labels: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Calculate f1, precision and recall from binary classification expected and predicted values.
:param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes
:param labels: 2-d tensor (batch, labels) of expected 0/1 classes
:return: a tuple of batched (f1, precision and recall) values
"""
predictions = tf.cast(predictions, tf.int32)
labels = tf.cast(labels, tf.int32)
true_positives = tf.reduce_sum((predictions * labels), axis=1)
false_positives = tf.reduce_sum(tf.cast(tf.greater(predictions, labels), tf.int32), axis=1)
false_negatives = tf.reduce_sum(tf.cast(tf.greater(labels, predictions), tf.int32), axis=1)
recall = true_positives / (true_positives + false_negatives)
precision = true_positives / (true_positives + false_positives)
f1_score = 2 / (1 / precision + 1 / recall)
return f1_score, precision, recall
def _check_edges(self, edges: List[Tuple[str, str]]) -> None:
for parent_node, child_node in edges:
# check if both nodes are already inserted into the graph
if child_node not in self._hyperparameters:
raise ValueError("Child hyperparameter '%s' not in configuration "
"space." % child_node)
if parent_node not in self._hyperparameters:
raise ValueError("Parent hyperparameter '%s' not in configuration "
"space." % parent_node)
# TODO: recursively check everything which is inside the conditions,
# this means we have to recursively traverse the condition
tmp_dag = self._create_tmp_dag()
for parent_node, child_node in edges:
tmp_dag.add_edge(parent_node, child_node)
if not ConfigSpace.nx.is_directed_acyclic_graph(tmp_dag):
cycles = list(ConfigSpace.nx.simple_cycles(tmp_dag)) # type: List[List[str]]
for cycle in cycles:
cycle.sort()
cycles.sort()
raise ValueError("Hyperparameter configuration contains a "
"cycle %s" % str(cycles))
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
[List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
examples_by_directory = group(examples, key=key_from_example)
directories = examples_by_directory.keys()
# split must be the same every time:
random.seed(42)
keys = set(random.sample(directories, int(training_share * len(directories))))
training_examples = [example for example in examples if key_from_example(example) in keys]
test_examples = [example for example in examples if key_from_example(example) not in keys]
return training_examples, test_examples
return split
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
try:
LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
except (KeyError, NetworkXNoPath):
raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))
LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
chain = []
cost = 0
for source, target in _pairwise(path):
transformer = self._type_graph.adj[source][target][_TRANSFORMER]
chain.append((transformer, target))
cost += transformer.cost
LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
if not chain:
return _identity, 0
return partial(_transform, transformer_chain=chain), cost
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
best = None
best_cost = _MAX_TRANSFORM_COST
to_type = None
for target_type in target_types:
try:
transform, cost = self._transform(source_type, target_type)
if cost < best_cost:
best = transform
best_cost = cost
to_type = target_type
except NoConversionError:
pass
if best is None:
raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
return best, to_type, best_cost
def _split_text_to_lines_and_columns(
cls, text) -> Iterable[Tuple[int, int, str]]:
lines = text.splitlines()
if len(lines) > cls._MAX_LINES:
raise ValueError(
'Max {} specification lines allowed, got {}'
.format(cls._MAX_LINES, len(lines)))
for line_number, line_text in enumerate(lines, 1):
if len(line_text) > cls._MAX_LINE_LENGTH:
raise ValueError(
'Specification lines must be max {} chars long, '
'got {}: {!r}'
.format(cls._MAX_LINE_LENGTH, len(line_text), line_text))
yield (line_number, 1, '{:40}'.format(line_text[0:40]))
yield (line_number, 2, '{:40}'.format(line_text[40:80]))
def _encrypt_key(
self,
key: bytes,
pubkey: bytes = None
) -> Tuple[bytes, bytes]:
"""
Encrypts the `key` provided for the provided `pubkey` using the ECIES
schema. If no `pubkey` is provided, it uses `self.pub_key`.
:param key: Key to encrypt
:param pubkey: Public Key to encrypt the `key` for
:return (encrypted key, encapsulated ECIES key)
"""
pubkey = pubkey or self.pub_key
symm_key, enc_symm_key = API.ecies_encaspulate(pubkey)
enc_key = API.symm_encrypt(symm_key, key)
return (enc_key, enc_symm_key)
def gen_path_keys(
self,
path: bytes
) -> List[Tuple[bytes, bytes]]:
"""
Generates path keys and returns path keys
:param path: Path to derive key(s) from
:return: List of path keys
"""
subpaths = self._split_path(path)
keys = []
for subpath in subpaths:
path_priv, path_pub = self._derive_path_key(subpath)
keys.append((path_priv, path_pub))
return keys
def encrypt(
self,
data: bytes,
pubkey: bytes = None
) -> Tuple[bytes, bytes]:
"""
Encrypts data with Public key encryption
:param data: Data to encrypt
:param pubkey: publc key to encrypt for
:return: (Encrypted Key, Encrypted data)
"""
pubkey = pubkey or self.pub_key
key, enc_key = API.ecies_encapsulate(pubkey)
enc_data = API.symm_encrypt(key, data)
return (enc_data, API.elliptic_curve.serialize(enc_key.ekey))
def decrypt(
self,
enc_data: Tuple[bytes, bytes],
privkey: bytes = None
) -> bytes:
"""
Decrypts data using ECIES PKE. If no `privkey` is provided, it uses
`self.priv_key`.
:param enc_data: Tuple: (encrypted data, ECIES encapsulated key)
:param privkey: Private key to decapsulate with
:return: Decrypted data
"""
privkey = privkey or self.priv_key
ciphertext, enc_key = enc_data
enc_key = API.elliptic_curve.deserialize(API.PRE.ecgroup, enc_key)
enc_key = API.umbral.EncryptedKey(ekey=enc_key, re_id=None)
dec_key = API.ecies_decapsulate(privkey, enc_key)
return API.symm_decrypt(dec_key, ciphertext)
def _ecies_gen_ephemeral_key(
recp_pubkey: Union[bytes, elliptic_curve.ec_element]
) -> Tuple[bytes, Tuple[bytes, bytes]]:
"""
Generates and encrypts an ephemeral key for the `recp_pubkey`.
:param recp_pubkey: Recipient's pubkey
:return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric
key, and encrypted ephemeral privkey
"""
symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey)
eph_privkey = API.ecies_gen_priv()
enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey)
return (eph_privkey, (enc_symm_key, enc_eph_privkey))
def ecdsa_priv2pub(
privkey: bytes,
to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
"""
Returns the public component of an ECDSA private key.
:param privkey: Private key as an int or bytestring
:param to_bytes: Serialize to bytes or not?
:return: Byte encoded or Tuple[int] ECDSA pubkey
"""
pubkey = privtopub(privkey)
if to_bytes:
return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey)
return pubkey
def ecdsa_verify(
v: int,
r: int,
s: int,
msghash: bytes,
pubkey: Union[bytes, Tuple[int, int]]
) -> bool:
"""
Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.
:param v: V of sig
:param r: R of sig
:param s: S of sig
:param bytes msghash: The hashed message to verify
:param bytes pubkey: Pubkey to validate signature for
:rtype: Boolean
:return: Is the signature valid or not?
"""
if bytes == type(pubkey):
pubkey = ecdsa_bytes2pub(pubkey)
verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
# TODO: Should this equality test be done better?
return verify_sig == pubkey
def ecies_ephemeral_split_rekey(
privkey_a: Union[bytes, elliptic_curve.ec_element],
pubkey_b: Union[bytes, elliptic_curve.ec_element],
min_shares: int,
total_shares: int
) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]:
"""
Performs a split-key re-encryption key generation where a minimum
number of shares `min_shares` are required to reproduce a rekey.
Will split a rekey inot `total_shares`.
This also generates an ephemeral keypair for the recipient as `pubkey_b`.
:param privkey_a: Privkey to re-encrypt from
:param pubkey_b: Public key to re-encrypt for (w/ ephemeral key)
:param min_shares: Minium shares needed to reproduce a rekey
:param total_shares: Total shares to generate from split-rekey gen
:return: A tuple containing a list of rekey frags, and a tuple of the
encrypted ephemeral key data (enc_symm_key, enc_eph_privkey)
"""
eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b)
kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares)
pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message)
return (kfrags, pfrag)
def encrypt(self,
data: bytes,
pubkey: bytes = None) -> Tuple[bytes, bytes]:
"""
:data: The data to encrypt. If derived per-subpath, it's a
symmetric key to use for block ciphers.
:pubkey: Optional public key to encrypt for. If not given, encrypt
for ours
:returns: (ekey, edata) where ekey is needed for recepient to
reconstruct a DH secret, edata is data encrypted with this
DH secret. The output should be treated as a monolithic
ciphertext outside of this class
"""
if pubkey is None:
pubkey = self._pub_key
else:
pubkey = ec.deserialize(self.pre.ecgroup, pubkey)
key, ekey = self.pre.encapsulate(pubkey)
cipher = SecretBox(key)
return ((ec.serialize(ekey.ekey), None),
cipher.encrypt(data))
def crop_scale(self, dimensions: Tuple[int, int]) -> 'Segment':
"""
Returns
-------
A new Segment, cropped and/or scaled as necessary to reach specified dimensions
"""
segment = self.copy()
dimensions = Dimensions(*dimensions)
if segment.aspect_ratio != dimensions.aspect_ratio:
# Crop segment to match aspect ratio
segment = segment.crop_to_aspect_ratio(dimensions.aspect_ratio)
if segment.dimensions != dimensions:
# Resize segment to reach final dimensions
segment = segment.resize(dimensions)
return segment
def store_message_in_file(self, message: Message) -> Tuple[str, str]:
"""
Stores a message in a json file.
The filename of the file will be the current time.
Also generates a response file location in which the executable may
write a response into
:param message: The message to save
:return: The location of the stored message json file,
the location of the response file
"""
json_data = message.to_dict()
while True: # Make sure that file does not exist
message_file = os.path.join(self.message_dir, str(time.time()))
if not os.path.isfile(message_file):
with open(message_file + ".json", 'w') as json_file:
json.dump(json_data, json_file)
return message_file + ".json", message_file + "-response.json"
# noinspection PyMethodMayBeStatic
def run_plugin(executable: str, args: List[str], timeout: int) -> Tuple[str, List[str]]:
run_args = [executable] + args
try:
proc = await asyncio.create_subprocess_exec(
*run_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
except FileNotFoundError:
raise NagiosError('executable not found')
stdin_data, stderr_data = await proc.communicate()
std_data = stdin_data + stderr_data
await proc.wait()
if proc.returncode not in [STATUS_OK, STATUS_WARNING, STATUS_CRITICAL]:
raise MonitorFailedError(std_data)
text, perf = parse_plugin_output(std_data)
if proc.returncode not in [STATUS_OK, STATUS_WARNING]:
raise MonitorFailedError(text)
return text, perf
def callAllMethods(obj: object) -> List[Tuple[str, Any]]:
results = [] # type: List[Tuple[str, Any]]
for method in dir(obj):
if method == '__hash__':
continue
if callable(getattr(obj, method)):
try:
res = getattr(obj, method)()
if isinstance(res, bool) or isinstance(res, int):
results.append((method, res))
if isinstance(res, str):
# Ignore anything with 0x in it since memory addresses change
if '0x' not in res:
results.append((method, res))
except:
if '0x' not in method:
results.append(('except', method))
return results