def get_conditions(self) -> List[AbstractCondition]:
conditions = []
added_conditions = set() # type: Set[str]
# Nodes is a list of nodes
for source_node in self.get_hyperparameters():
# This is a list of keys in a dictionary
# TODO sort the edges by the order of their source_node in the
# hyperparameter list!
for target_node in self._children[source_node.name]:
if target_node not in added_conditions:
condition = self._children[source_node.name][target_node]
conditions.append(condition)
added_conditions.add(target_node)
return conditions
python类Set()的实例源码
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]:
"""
Given a list of pmids, make article items for each
:param pmids: list of pmids
:param login: wdi_core login instance
:param write: actually perform write
:return: map pmid -> wdid
"""
pmid_map = dict()
for pmid in pmids:
p = wdi_helpers.PubmedItem(pmid)
if write:
try:
pmid_wdid = p.get_or_create(login)
except Exception as e:
print("Error creating article pmid: {}, error: {}".format(pmid, e))
continue
pmid_map[pmid] = pmid_wdid
else:
pmid_map[pmid] = 'Q1'
return pmid_map
def get_contact_dict_for_active_monitor(dbcon: DBConnection, monitor_id: int) -> Dict[str, set]:
"""Get all contact addresses/numbers for a specific active monitor.
Return: Dict[str, Set(str)] for 'email' and 'phone'.
"""
ret = {
'email': set(),
'phone': set(),
} # type: Dict[str, set]
contacts = await get_all_contacts_for_active_monitor(dbcon, monitor_id)
for contact in contacts:
if contact.email:
ret['email'].add(contact.email)
if contact.phone:
ret['phone'].add(contact.phone)
return ret
def select_channel(
self,
versions: typing.Set[CustomVersion],
update_channel: str = channel.STABLE
) -> typing.Union[CustomVersion, None]:
"""
Selects the latest version, equals or higher than "channel"
Args:
versions: versions to select from
update_channel: member of :class:`Channel`
Returns: latest version or None
"""
LOGGER.debug(f'selecting latest version amongst {len(versions)}; active channel: {str(channel)}')
options = list(self.filter_channel(versions, update_channel))
if options:
latest = max(options)
return latest
LOGGER.debug('no version passed the test')
return None
def analyze_must_defined_regs(
blocks: List[BasicBlock],
cfg: CFG,
initial_defined: Set[Register],
num_regs: int) -> AnalysisResult[Register]:
"""Calculate always defined registers at each CFG location.
A register is defined if it has a value along all paths from the initial location.
"""
return run_analysis(blocks=blocks,
cfg=cfg,
gen_and_kill=MustDefinedVisitor(),
initial=initial_defined,
backward=False,
kind=MUST_ANALYSIS,
universe=set([Register(r) for r in range(num_regs)]))
def add_listener(self, keys: Union[Key, Set[Key]], callback: ListenerCallback) -> None:
'''
Attach ``callback`` to one or more keys. If more than one key is provided, the callback will be
reading all messages from one queue, and they are guaranteed to arrive in the same order they were published.
:param keys: One key, or a set of keys.
'''
keys, sub_id = self._get_listener_subscription(keys, callback)
for key in keys:
self.subscribe(key, sub_id)
async def consumer() -> None:
key, msg = await self.consume(sub_id)
callback(key, msg)
loop = aiopubsub.loop.Loop(consumer, delay = None)
loop.start()
self._listeners[sub_id] = Listener(loop, keys)
def read_all_stop_words() -> Set[str]:
# Data source: https://wenku.baidu.com/view/7ca26338376baf1ffc4fad6a.html
with open("data/chinese_stop_words.txt", mode="r", encoding="utf-8") as local_file:
text_lines = local_file.readlines()
text_lines = list(x.replace("\n", "") for x in text_lines)
with open("data/chinese_stop_symbols.txt", mode="r", encoding="utf-8") as local_file:
symbol_lines = local_file.readlines()
symbol_lines = list(x.replace("\n", "") for x in symbol_lines)
public_stop_words = get_stop_words("zh")
stop_words: Set[str] = set()
stop_words = stop_words.union(text_lines)
stop_words = stop_words.union(symbol_lines)
stop_words = stop_words.union(public_stop_words)
return stop_words
def _get_blocks(self, blocks: Union[List[int], Set[int]]):
""" Fetch multiple blocks from steemd at once.
Warning:
This method does not ensure that all blocks are returned, or that the results are ordered.
You will probably want to use `steemd.get_blocks()` instead.
Args:
blocks (list): A list, or a set of block numbers.
Returns:
A generator with results.
"""
results = self.exec_multi_with_futures('get_block', blocks, max_workers=10)
return ({**x, 'block_num': int(x['block_id'][:8], base=16)} for x in results if x)
def get_majority_vote_in_set_for_event(hashgraph, s: Set[str], x: Event) -> (bool, int):
"""
Returns the majority vote and the winning amount of stake that a set of witnesses has for another event
:param hashgraph:
:param s:
:param x:
:return: Tuple containing the majority vote (bool) and the total stake of the majority vote (int)
"""
stake_for = 0
stake_against = 0
for event_id in s:
event = hashgraph.lookup_table[event_id]
if x.id in event.votes and event.votes[x.id]:
stake_for += hashgraph.known_members[event.verify_key].stake
else:
stake_against += hashgraph.known_members[event.verify_key].stake
return Fame.TRUE if stake_for >= stake_against else Fame.FALSE, stake_for if stake_for >= stake_against else stake_against
def superbubble_nodes(g: AssemblyGraph, source: Node,
sink: Node) -> Set[Node]:
"""Find all nodes inside a superbubble."""
queue = deque([source])
visited = {source, sink}
while queue:
current = queue.popleft()
for neighbour in g.neighbors_iter(current):
if neighbour not in visited:
queue.append(neighbour)
visited.add(neighbour)
return visited
def __init__(self, ploidy: int, copy_from: 'HaplotypeSet'=None):
self.ploidy = ploidy
# Nodes spelling each haplotype
self.haplotypes = [] # type: List[List[Node]]
# Also keep a set of reads used for each haplotype, useful for
# relative likelihood calculation
self.read_sets = [] # type: List[Set[OrientedRead]]
if isinstance(copy_from, HaplotypeSet):
for i in range(ploidy):
self.haplotypes.append(deque(copy_from.haplotypes[i]))
self.read_sets.append(set(copy_from.read_sets[i]))
else:
for i in range(ploidy):
self.haplotypes.append(deque())
self.read_sets.append(set())
self.log_rl = float('-inf')
self.from_large_bubble = False
def extend(self, extensions: List[Tuple[Node]],
ext_read_sets: List[Set[OrientedRead]]) -> 'HaplotypeSet':
"""Extend the haplotype set with a new set of paths."""
# Make a copy of itself for a new set
new_set = HaplotypeSet(self.ploidy, copy_from=self)
for hap_num, (extension, read_set) in enumerate(
zip(extensions, ext_read_sets)):
haplotype_nodes = new_set.haplotypes[hap_num]
# Add the nodes of the extension to each haplotype
# It's possible that the last node of this haplotype set
# (which is probably a bubble exit), is also the bubble entrance
# and thus our start node of our extension.
if (len(haplotype_nodes) > 0 and
haplotype_nodes[-1] == extension[0]):
haplotype_nodes.extend(extension[1:])
else:
haplotype_nodes.extend(extension)
new_set.read_sets[hap_num].update(read_set)
return new_set
def _update_grammar_text(self) -> None:
"""
"B", {"aB", "bC", "a"} turns into
"B -> aB | bC | a"
"""
def transform_production(non_terminal: str, productions: Set[str]):
return "{} -> {}".format(
non_terminal, " | ".join(sorted(productions)))
initial_symbol = self._grammar.initial_symbol()
productions = self._grammar.productions()
text = ""
if initial_symbol in productions:
text = transform_production(
initial_symbol, productions[initial_symbol]) + "\n"
for non_terminal in sorted(set(productions.keys()) - {initial_symbol}):
text += transform_production(
non_terminal, productions[non_terminal]) + "\n"
self.grammarText.setPlainText(text)
def down(self, visited: FrozenSet[Any]=None) -> Set[Any]:
""" Returns the set of reachable nodes by going down on this node """
if visited is None:
visited = frozenset()
if self in visited:
return {self} if self.symbol not in OPERATORS else set()
visited |= {self}
if self.symbol == '|':
return self.left.down(visited) | self.right.down(visited)
elif self.symbol == '.':
return self.left.down(visited)
elif self.symbol == '*' or self.symbol == '?':
return self.left.down(visited) | self.right.up(visited)
elif self.symbol == EPSILON:
return self.right.up(visited)
return {self}
def up(self, visited: FrozenSet[Any]=None) -> Set[Any]:
""" Returns the set of reachable nodes by going up on this node """
if visited is None:
visited = frozenset()
if self.symbol == '|':
# skip the whole right sub tree
node = self.right
while node.symbol == '.' or node.symbol == '|':
node = node.right
return node.right.up(visited)
elif self.symbol == '.':
return self.right.down(visited)
elif self.symbol == '*':
return self.left.down(visited) | self.right.up(visited)
elif self.symbol == '?':
return self.right.up(visited)
else: # self.symbol == END:
return {self}
def remove_state(self, state: str) -> None:
""" Removes a state """
# may not remove initial state
if state != self._initial_state:
self._states.discard(state)
self._final_states.discard(state)
for symbol in self._alphabet:
# remove useless transitions that come from the removed state
if (state, symbol) in self._transitions:
del self._transitions[state, symbol]
empty_transitions = set() # type Set[Tuple[str, str]]
for actual_state, next_state in self._transitions.items():
# remove transitions that go to the removed state
next_state.discard(state)
if not next_state:
empty_transitions.add(actual_state)
for transition in empty_transitions:
del self._transitions[transition]
def _are_undistinguishable(
self, state_a: str, state_b: str,
undistinguishable: Set[FrozenSet[str]]) -> bool:
"""
State a and b are distinguishable if they go to distinguishable
states for some input symbol.
"""
for symbol in self._alphabet:
transition_a = \
list(self._transitions.get((state_a, symbol), {""}))[0]
transition_b = \
list(self._transitions.get((state_b, symbol), {""}))[0]
if transition_a != transition_b and \
frozenset((transition_a, transition_b)) not in \
undistinguishable:
return False
return True
def _determinize_state(self, states_set: Set[str]) -> None:
"""
For a given set of states, verify whether they pertains to the
actual states of the FA. In negative case, add it and insert
the transitions properly
"""
name = "".join(sorted(states_set))
if name and name not in self._states:
self.add_state(name)
if states_set.intersection(self._final_states):
self._final_states.add(name)
for symbol in self._alphabet:
reachable = self._find_reachable(states_set, symbol)
if reachable:
self._transitions[name, symbol] = reachable
self._determinize_state(reachable)
def _has_recursion(self, to_visit: str, visited: Set[str]) -> bool:
"""
Checks if the automata has recursive states, using a depth
first search approach.
"""
if to_visit in visited:
return True
visited.add(to_visit)
reachable = set() # type: Set[str]
# Find the reachable through all symbols
for symbol in self._alphabet:
reachable.update(self._find_reachable({to_visit}, symbol))
for state in reachable:
if self._has_recursion(state, copy.deepcopy(visited)):
return True
return False
def from_regular_grammar(grammar) -> 'NFA':
""" Converts RegularGrammar to NFA """
initial_symbol = grammar.initial_symbol()
productions = grammar.productions()
states = set(productions.keys()) | {"X"}
alphabet = set() # type: Set[str]
transitions = {} # type: Dict[Tuple[str, str], Set[str]]
initial_state = initial_symbol
final_states = set("X") | \
({initial_symbol} if "&" in productions[initial_symbol] else set())
for non_terminal, prods in productions.items():
for production in prods:
if production == "&":
continue
new_transition = "X" if len(production) == 1 else production[1]
transitions.setdefault(
(non_terminal, production[0]), set()).add(new_transition)
alphabet.add(production[0])
return NFA(states, alphabet, transitions, initial_state, final_states)
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]:
"""
Finds this weeks remaining common breaks between a group of users
"""
# So, the Mypy type checker treats `List` as invariant, meaning we
# can't give a `List[B]` to a function that expects a `List[A]` if
# B is a subclass of A.
# So we have to cast it in to the function...
# FIXME: Get rid of these casts when Van Rossum figures out how to write a
# proper type system
breaks = cast(List[Event_], get_shared_breaks(group_members))
now = datetime.now(BRISBANE_TIME_ZONE)
### ... and out.
return cast(List[Break], get_this_weeks_events(now, breaks))
# FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
def search_show_ids_by_names(self, *names, exact=False) -> Set[Show]:
shows = set()
for name in names:
debug("Searching shows by name: {}".format(name))
if exact:
self.q.execute("SELECT show, name FROM ShowNames WHERE name = ?", (name,))
else:
self.q.execute("SELECT show, name FROM ShowNames WHERE name = ? COLLATE alphanum", (name,))
matched = self.q.fetchall()
for match in matched:
debug(" Found match: {} | {}".format(match[0], match[1]))
shows.add(match[0])
return shows
# Helper methods
## Conversions
def init_by_vocabulary(self, lemma_counter: Counter, lemma_to_word_forms: Dict[str, Set[WordForm]],
lemma_case: Dict[str, LemmaCase]):
"""
?????? ??????? ?? ???????????????? ??????
:param lemma_counter: Counter ?? ??????.
:param lemma_to_word_forms: ??????????? ?? ????? ? ?????? ????????? ????????? ??? ??? (?? ?????????)
:param lemma_case: ??????????? ?? ????? ? ??? ?????????????, ????????? ??? ???? ?????
"""
for i, (lemma, _) in enumerate(tqdm(lemma_counter.most_common(), desc="Init vocabulary")):
for word_form in lemma_to_word_forms[lemma]:
word_form.set_case(lemma_case[word_form.lemma])
self.word_forms.append(word_form)
self.word_form_indices[word_form] = len(self.word_forms) - 1
assert self.word_forms[self.word_form_indices[word_form]] == word_form
self.lemma_indices[word_form] = i + 1 # 0 - ?????????????? ??? ????????.
assert self.lemma_indices[SEQ_END_WF] == 1
def __init__(self, language: str="ru", mode: Mode=Mode.GRAPHEMES, raw_dict_path=None, trie_path=None,
zalyzniak_dict=ZALYZNYAK_DICT, cmu_dict=CMU_DICT) -> None:
self.data = pygtrie.Trie() # type: Dict[str, Set[Stress]]
self.raw_dict_path = raw_dict_path
self.trie_path = trie_path
if language == "ru" and mode == self.Mode.GRAPHEMES:
self.__init_defaults(RU_GRAPHEME_STRESS_PATH, RU_GRAPHEME_STRESS_TRIE_PATH)
if not os.path.exists(self.raw_dict_path):
from rupo.dict.zaliznyak import ZalyzniakDict
ZalyzniakDict.convert_to_accent_only(zalyzniak_dict, self.raw_dict_path)
elif mode == self.Mode.PHONEMES and language == "en":
self.__init_defaults(EN_PHONEME_STRESS_PATH, EN_PHONEME_STRESS_TRIE_PATH)
if not os.path.exists(self.raw_dict_path):
CMUDict.convert_to_phoneme_stress(cmu_dict, self.raw_dict_path)
else:
assert False
if not os.path.isfile(self.raw_dict_path):
raise FileNotFoundError("Dictionary raw file not found.")
if os.path.isfile(self.trie_path):
self.load(self.trie_path)
else:
self.create(self.raw_dict_path, self.trie_path)
def _find_default_mounts() -> Set[str]:
global config_paths
basepath = os.getcwd()
miniparser = configargparse.ArgumentParser()
miniparser.add_argument(*args_for_setting_config_path, dest="config", action="append",
default=[])
args, _ = miniparser.parse_known_args()
# type: ignore, because mypy doesn't parse add_argument above correctly
if not args.config:
args.config = default_config_files
paths = set()
paths.add(basepath)
for cfg in args.config:
if os.path.isfile(cfg):
paths.add(os.path.abspath(os.path.dirname(cfg)))
config_paths.add(os.path.abspath(os.path.dirname(cfg)))
return paths
def __init__(self):
"""
TODO
:param program_ast: TODO
:return: None
"""
super(PyCoolSemanticAnalyser, self).__init__()
# Initialize the internal program ast instance.
self._program_ast = None
# Classes Map: maps each class name (key: String) to its class instance (value: AST.Class).
# Dict[AnyStr, AST.Class]
self._classes_map = dict()
# Class Inheritance Graph: maps a parent class (key: String) to a unique collection of its
# children classes (value: set).
# Dict[AnyStr, Set]
self._inheritance_graph = defaultdict(set)
# #########################################################################
# PUBLIC #
# #########################################################################
def _get_commands(dist # type: setuptools.dist.Distribution
):
# type: (...) -> typing.Dict[str, typing.Set[str]]
"""Find all commands belonging to the given distribution.
Args:
dist: The Distribution to search for docopt-compatible docstrings that
can be used to generate command entry points.
Returns:
A dictionary containing a mapping of primary commands to sets of
subcommands.
"""
py_files = (f for f in setuptools.findall()
if os.path.splitext(f)[1].lower() == '.py')
pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages)
commands = {} # type: typing.Dict[str, typing.Set[str]]
for file_name in pkg_files:
with open(file_name) as py_file:
module = typing.cast(ast.Module, ast.parse(py_file.read()))
module_name = _get_module_name(file_name)
_append_commands(commands, module_name, _get_module_commands(module))
_append_commands(commands, module_name, _get_class_commands(module))
_append_commands(commands, module_name, _get_function_commands(module))
return commands
def _append_commands(dct, # type: typing.Dict[str, typing.Set[str]]
module_name, # type: str
commands # type:typing.Iterable[_EntryPoint]
):
# type: (...) -> None
"""Append entry point strings representing the given Command objects.
Args:
dct: The dictionary to append with entry point strings. Each key will
be a primary command with a value containing a list of entry point
strings representing a Command.
module_name: The name of the module in which the command object
resides.
commands: A list of Command objects to convert to entry point strings.
"""
for command in commands:
entry_point = '{command}{subcommand} = {module}{callable}'.format(
command=command.command,
subcommand=(':{}'.format(command.subcommand)
if command.subcommand else ''),
module=module_name,
callable=(':{}'.format(command.callable)
if command.callable else ''),
)
dct.setdefault(command.command, set()).add(entry_point)
def set_grade(self, new_grade: float, user: User) -> None:
"""Set the grade to the new grade.
.. note:: This also passes back the grade to LTI if this is necessary
(see :py:func:`passback_grade`).
:param new_grade: The new grade to set
:param user: The user setting the new grade.
:returns: Nothing
"""
self._grade = new_grade
passback = self.assignment.should_passback
grade = self.grade
history = GradeHistory(
is_rubric=self._grade is None and grade is not None,
grade=-1 if grade is None else grade,
passed_back=False,
work=self,
user=user
)
db.session.add(history)
db.session.flush()
if passback:
psef.tasks.passback_grades([self.id])
def _add_defining_attribute(self, coll: Collection, group: int, rels: Set[RF2Files.Relationship]) -> None:
if group == 0:
for rel in rels:
restr = existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId))
if rel.typeId in self._context.NEVER_GROUPED:
coll.append(restr)
else:
coll.append(role_group(self, restr))
else:
if len(rels) > 1:
# A group whose target is an intersection of subjects + inner restrictions
target, inner_coll = intersection(self)
[inner_coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)))
for rel in rels]
coll.append(role_group(self, target))
else:
rel = list(rels)[0]
coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)))