def copy_tree_actions(base: str, include_patterns: t.Union[t.List[str], str] = ["**", "**/.*"],
exclude_patterns: t.List[str] = None) -> t.List[Action]:
"""
Actions for all files and directories in the base directory that match the given patterns.
It's used to copy a whole directory tree.
:param base: base directory
:param include_pattern: patterns that match the paths that should be included
:param exclude_patterns: patterns that match the paths that should be excluded
:return: list of actions
"""
paths = matched_paths(base, include_patterns, exclude_patterns)
files = set() # type: t.Set[str]
dirs = set() # type: t.Set[str]
ret = [] # type: t.List[Action]
for path in paths:
ret.extend(actions_for_dir_path(path, path_acc=dirs))
if os.path.isfile(path) and path not in files:
files.add(path)
ret.append(CopyFile(normalize_path(path)))
return ret
python类Set()的实例源码
def actions_for_dir_path(path: str, path_acc: t.Set[str] = set()) -> t.List[Action]:
"""
Returns a list of actions that is needed to create a folder and it's parent folders.
:param path:
:param path_acc: paths already examined
"""
path = abspath(path)
typecheck_locals(path=FileName(allow_non_existent=False)|DirName(), create=Bool())
assert os.path.exists(path)
if path == "" or path == "~":
return []
path = normalize_path(path)
parts = path.split("/")
ret = []
for i in range(2 if parts[0] == "~" else 1, len(parts) + 1 if os.path.isdir(abspath(path)) else len(parts)):
subpath = "/".join(parts[:i])
subpath_norm = normalize_path(subpath)
if subpath_norm in path_acc:
continue
ret.append(CreateDir(subpath_norm))
path_acc.add(subpath_norm)
return ret
def combine(*messages: t.Tuple[t.Optional['StatMessage']]) -> t.List['StatMessage']:
"""
Combines all message of the same type and with the same parent in the passed list.
Ignores None entries.
:param messages: passed list of messages
:return: new reduced list
"""
msgs = set([msg for msg in messages if msg is not None]) # t.Set['StatMessage']
something_changed = True
while something_changed:
something_changed = False
merged_pair = None # type: t.Tuple['StatMessage', 'StatMessage']
for (msg, msg2) in itertools.product(msgs, msgs):
if msg is not msg2:
if msg.parent.eq_except_property(msg2.parent) and type(msg) == type(msg2):
merged_pair = (msg, msg2)
something_changed = True
break
if something_changed:
msg, msg2 = merged_pair
msgs.remove(msg)
msgs.remove(msg2)
msgs.add(msg + msg2)
return list(msgs)
def test_typing():
from typing import Any, List, Set, Dict, Type, Tuple
assert name_type(Any) == "Any"
assert name_type(List) == "List"
assert name_type(List[Any]) == "List"
assert name_type(List[str]) == "List[str]"
assert name_type(List[int]) == "List[int]"
assert name_type(Set) == "Set"
assert name_type(Set[Any]) == "Set"
assert name_type(Set[List]) == "Set[List]"
assert name_type(Dict) == "Dict"
assert name_type(Dict[Any, Any]) == "Dict"
assert name_type(Dict[str, int]) == "Dict[str, int]"
assert name_type(Type) == "Type"
assert name_type(Type[int]) == "Type[int]"
assert name_type(Type[MagicType]) == "Type[MagicType]"
assert name_type(Tuple) == "Tuple"
assert name_type(Tuple[int]) == "Tuple[int]"
assert name_type(Tuple[int, str, List]) == "Tuple[int, str, List]"
assert name_type(Tuple[int, Ellipsis]) == "Tuple[int, ...]"
assert name_type(Tuple[str, Ellipsis]) == "Tuple[str, ...]"
def name(self):
if self._args is None:
return "Callable"
elif self._args[0] is Ellipsis:
return "Callable[..., %s]" % checker_for_type(self._args[1]).name()
else:
return "Callable[[%s], %s]" % (
", ".join(checker_for_type(z).name() for z in self._args[:-1]),
checker_for_type(self._args[-1]).name())
# ------------------------------------------------------------------------------
#
# Set operations with checkers
# ------------------------------------------------------------------------------
def __init__(self, master_db, lib_name, params, used_names, **kwargs):
# type: (MasterDB, str, Dict[str, Any], Set[str], **kwargs) -> None
self._master_db = master_db
self._lib_name = lib_name
self._used_names = used_names
# set parameters
params_info = self.get_params_info()
default_params = self.get_default_param_values()
self.params = {}
if params_info is None:
# compatibility with old schematics generators
self.params.update(params)
self._prelim_key = self.to_immutable_id((self._get_qualified_name(), params))
self._cell_name = None
self._key = None
else:
self.populate_params(params, params_info, default_params, **kwargs)
# get unique cell name
self._prelim_key = self.compute_unique_key()
self.update_master_info()
self.children = None
self._finalized = False
def deserialize_abstract_type(cls, data):
abstract_type_map = {
typing.Sequence: list,
typing.List: list,
typing.Dict: dict,
typing.Set: set,
typing.AbstractSet: set,
}
cls_origin_type = cls.__origin__
if cls_origin_type is None:
cls_origin_type = cls
iterable_types = {
typing.Sequence, typing.List, typing.Tuple, typing.Set,
typing.AbstractSet, typing.Mapping,
}
if cls_origin_type in iterable_types:
return deserialize_iterable_abstract_type(cls, cls_origin_type, data)
else:
return abstract_type_map[cls_origin_type](data)
def hamiltonian_cycle(g: Graph, start: Vertex) -> List[Vertex]:
path = [start]
current = start
visited: Set[Vertex] = set()
try:
while len(visited) != g.order:
visited.add(current)
(_, nearest) = min(
(g.weight[current, v], v)
for v in g.neighbors(current)
if v not in visited
)
path.append(nearest)
current = nearest
except ValueError as e:
if len(path) == g.order:
return path
raise HamiltonianCycleNotFound('graph has dead ends')
def dfs(g: Graph, current: Vertex, condition: Test,
visited: Set = None) -> Optional[Vertex]:
visited = visited or set()
if current in visited:
return None
visited.add(current)
if condition(current):
return current
for n in g.neighbors(current):
v = dfs(g, n, condition, visited)
if v is not None:
return v
return None
def transitive_closure(
g: Graph,
v: Vertex,
visited: Optional[Set[Vertex]] = None) -> Set[Vertex]:
"""
Returns a set containing all vertices reachable from v
"""
visited = visited or set()
visited.add(v)
for v_neigh in g.neighbors(v):
if v_neigh not in visited:
transitive_closure(g, v_neigh, visited)
return visited
def test_typing_extractor_register(typing_extractor):
def extract_set(extractor, typ):
subtype = Any
if typ.__args__ and typ.__args__[0] is not Any:
subtype = typ.__args__[0]
return {
"type": "array",
"title": "set",
"items": extractor.extract(extractor, subtype)
}
typing_extractor.register(set, extract_set)
assert typing_extractor.extract(typing_extractor, Set[int]) == {
"type": "array",
"title": "set",
"items": {"type": "integer"}
}
def from_type(t):
'''Converts a type `t` to a Typeable
'''
if isinstance(t, Typeable):
return t
if isinstance(t, list):
if len(t) != 1:
if len(t) < 1:
reason = 'Missing type parameter'
else:
reason = 'Too many type parameters, only homogenous lists allowed'
msg = 'Can only use literal list alias with a single type, `{}` is invalid: {}'
raise ValueError(msg.format(repr(t), reason))
t0 = from_type(t[0]).typ
return from_type(typing.List[t0])
elif isinstance(t, set):
if len(t) != 1:
if len(t) < 1:
reason = 'Missing type parameter'
else:
reason = 'Too many type parameters, only homogenous sets allowed'
msg = 'Can only use literal set alias with a single type, `{}` is invalid: {}'
raise ValueError(msg.format(repr(t), reason))
t0 = from_type(next(iter(t))).typ
return from_type(typing.Set[t0])
elif isinstance(t, tuple):
args = tuple([from_type(a).typ for a in t])
return from_type(typing.Tuple[args])
return _from_typing36(t)
def test_setint_convert():
it = typeable.from_type({int})
assert it.typ is typing.Set[int]
assert it.origin.typ is typing.Set
assert it.origin.origin is None
assert it.origin.args == []
assert it.arity == 0
def __init__(self, seed: Union[int, None] = None) -> None:
self._hyperparameters = OrderedDict() # type: OrderedDict[str, Hyperparameter]
self._hyperparameter_idx = dict() # type: Dict[str, int]
self._idx_to_hyperparameter = dict() # type: Dict[int, str]
# Use dictionaries to make sure that we don't accidently add
# additional keys to these mappings (which happened with defaultdict()).
# This once broke auto-sklearn's equal comparison of configuration
# spaces when _children of one instance contained all possible
# hyperparameters as keys and empty dictionaries as values while the
# other instance not containing these.
self._children = OrderedDict() # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]]
self._parents = OrderedDict() # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]]
# changing this to a normal dict will break sampling because there is
# no guarantee that the parent of a condition was evaluated before
self._conditionals = set() # type: Set[str]
self.forbidden_clauses = [] # type: List['AbstractForbiddenComponent']
self.random = np.random.RandomState(seed)
self._children['__HPOlib_configuration_space_root__'] = OrderedDict()
# caching
self._parent_conditions_of = dict()
self._child_conditions_of = dict()
self._parents_of = dict()
self._children_of = dict()
def __init__(self, nodes: Set[Node], in_node: Node, out_node: Node, edges: Set[Edge]):
"""Control flow graph representation.
:param nodes: set of nodes of the control flow graph
:param in_node: entry node of the control flow graph
:param out_node: exit node of the control flow graph
:param edges: set of edges of the control flow graph
"""
self._nodes = {node.identifier: node for node in nodes}
self._in_node = in_node
self._out_node = out_node
self._edges = {(edge.source, edge.target): edge for edge in edges}
def in_edges(self, node: Node) -> Set[Edge]:
"""Ingoing edges of a given node.
:param node: given node
:return: set of ingoing edges of the node
"""
return {self.edges[(source, target)] for (source, target) in self.edges if target == node}
def predecessors(self, node: Node) -> Set[Node]:
"""Predecessors of a given node.
:param node: given node
:return: set of predecessors of the node
"""
return {edge.source for edge in self.in_edges(node)}
def out_edges(self, node: Node) -> Set[Edge]:
"""Outgoing edges of a given node.
:param node: given node
:return: set of outgoing edges of the node
"""
return {self.edges[(source, target)] for (source, target) in self.edges if source == node}
def successors(self, node: Node) -> Set[Node]:
"""Successors of a given node.
:param node: given node
:return: set of successors of the node
"""
return {edge.target for edge in self.out_edges(node)}
def result(self, result: Set[Expression]):
self._result = result