def down(self, visited: FrozenSet[Any]=None) -> Set[Any]:
""" Returns the set of reachable nodes by going down on this node """
if visited is None:
visited = frozenset()
if self in visited:
return {self} if self.symbol not in OPERATORS else set()
visited |= {self}
if self.symbol == '|':
return self.left.down(visited) | self.right.down(visited)
elif self.symbol == '.':
return self.left.down(visited)
elif self.symbol == '*' or self.symbol == '?':
return self.left.down(visited) | self.right.up(visited)
elif self.symbol == EPSILON:
return self.right.up(visited)
return {self}
python类FrozenSet()的实例源码
def up(self, visited: FrozenSet[Any]=None) -> Set[Any]:
""" Returns the set of reachable nodes by going up on this node """
if visited is None:
visited = frozenset()
if self.symbol == '|':
# skip the whole right sub tree
node = self.right
while node.symbol == '.' or node.symbol == '|':
node = node.right
return node.right.up(visited)
elif self.symbol == '.':
return self.right.down(visited)
elif self.symbol == '*':
return self.left.down(visited) | self.right.up(visited)
elif self.symbol == '?':
return self.right.up(visited)
else: # self.symbol == END:
return {self}
def _are_undistinguishable(
self, state_a: str, state_b: str,
undistinguishable: Set[FrozenSet[str]]) -> bool:
"""
State a and b are distinguishable if they go to distinguishable
states for some input symbol.
"""
for symbol in self._alphabet:
transition_a = \
list(self._transitions.get((state_a, symbol), {""}))[0]
transition_b = \
list(self._transitions.get((state_b, symbol), {""}))[0]
if transition_a != transition_b and \
frozenset((transition_a, transition_b)) not in \
undistinguishable:
return False
return True
def __init__(self, discovered_files: etl.file_sets.TableFileSet) -> None:
# Basic properties to locate files describing the relation
self._fileset = discovered_files
if discovered_files.scheme == "s3":
self.bucket_name = discovered_files.netloc
self.prefix = discovered_files.path
else:
self.bucket_name = None
self.prefix = None
# Note the subtle difference to TableFileSet--here the manifest_file_name is always present since it's computed
self.manifest_file_name = os.path.join(discovered_files.path or "", "data", self.source_path_name + ".manifest")
# Lazy-loading of table design and query statement and any derived information from the table design
self._table_design = None # type: Optional[Dict[str, Any]]
self._query_stmt = None # type: Optional[str]
self._dependencies = None # type: Optional[FrozenSet[str]]
self._is_required = None # type: Union[None, bool]
def merge_equivalent(self) -> None:
""" Merges equivalent states """
if not self.is_deterministic():
raise RuntimeError("Automata is non-deterministic")
# pairs of undistinguishable states
undistinguishable = set() # type: Set[FrozenSet[str]]
# initially, you can't distinguish final and non-final states
for pair in combinations(self._states - self._final_states, 2):
undistinguishable.add(frozenset(pair))
for pair in combinations(self._final_states, 2):
undistinguishable.add(frozenset(pair))
# find new distinguishable states
while True:
new_distinguishable_found = False
undistinguishable_copy = undistinguishable.copy()
for state_a, state_b in undistinguishable_copy:
if not self._are_undistinguishable(
state_a, state_b, undistinguishable_copy):
undistinguishable.remove(frozenset((state_a, state_b)))
new_distinguishable_found = True
if not new_distinguishable_found:
break
for state_a, state_b in undistinguishable:
self._merge_states(state_a, state_b)
def pytest_assertrepr_compare(op: str, left, right) -> Optional[Sequence[str]]:
# set of entities
if op == '==' and isinstance(left, (set, frozenset)) and \
isinstance(right, (set, frozenset)) and \
all(isinstance(v, Entity) for v in left) and \
all(isinstance(v, Entity) for v in right):
def repr_ids(ids: AbstractSet[EntityId]) -> str:
sorted_ids = sorted(
ids,
key=lambda i: (
i[0],
# Since EntityIds usually consist of one letter followed
# by digits, order them numerically. If it's not in
# that format they should be sorted in the other bucket.
(0, int(i[1:])) if i[1:].isdigit() else (1, i[1:])
)
)
return '{' + ', '.join(sorted_ids) + '}'
left = cast(Union[Set[Entity], FrozenSet[Entity]], left)
right = cast(Union[Set[Entity], FrozenSet[Entity]], right)
left_ids = {e.id for e in left}
right_ids = {e.id for e in right}
return [
'{} == {}'.format(repr_ids(left_ids), repr_ids(right_ids)),
'Extra entities in the left set:',
repr_ids(left_ids - right_ids),
'Extra entities in the right set:',
repr_ids(right_ids - left_ids),
]
return None
def check_frozenset(a: typing.FrozenSet[int]) -> typing.FrozenSet[int]:
return a
def fields(self) -> FrozenSet[str]:
if self._fields is None:
self._fields = frozenset(field['SystemName'] for field in self.table)
return self._fields
def vocabulary(self) -> FrozenSet[str]:
return self.properties.vocabulary
def _get_vocabulary(self) -> FrozenSet[str]:
source = ''.join(text_map)
source_special = ' '
target = ''.join(np.arange(0, self._digits).astype(np.str))
return frozenset(source + source_special + target)
def user_roles(self) -> typing.FrozenSet[str]:
return frozenset(self._user_roles)
def user_roles_indexable(self) -> typing.FrozenSet[str]:
return frozenset(self._user_roles_indexable)
def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]:
return self._user_caps
def variables(self) -> FrozenSet[str]:
"""The names of the variables the constraint depends upon.
Used by matchers to decide when a constraint can be evaluated (which is when all
the dependency variables have been assigned a value). If the set is empty, the constraint will
only be evaluated once the whole match is complete.
"""
return frozenset()
def dependencies(self) -> FrozenSet[str]:
if self._dependencies is None:
self._dependencies = frozenset(self.table_design.get("depends_on", []))
return self._dependencies
def _possible_investments(self, market_state: MarketState) -> FrozenSet[str]:
'''
Returns a set of all coins that the strategy might invest in, excluding
`self.fiat`.
'''
return market_state.available_coins() - {self.fiat}
def _held_coins(self) -> FrozenSet[str]:
return frozenset(
coin for (coin, balance)
in self.balances.items()
if balance > 0
)
def available_markets(self) -> FrozenSet[str]:
"""Return a frozenset containing all available markets, e.g. 'BTC_ETH'.
"""
return frozenset(
filter(
lambda market: market.startswith(self.fiat),
self.chart_data.keys(),
)
)
def available_coins(self) -> FrozenSet[str]:
markets = self.available_markets() # All of these start with fiat
return frozenset(split_currency_pair(m)[1] for m in markets) | {self.fiat}
def held_coins_with_chart_data(self) -> FrozenSet[str]:
return self._held_coins() & self.available_coins()
def __init__(self,
source_lang: str, target_lang: str,
key: Any=None,
vocabulary: FrozenSet[str]=None,
observations: int=None,
validate: bool=False,
name: str='unamed',
tqdm: bool=True,
**kwargs) -> None:
# save basic properties
self._show_tqdm = tqdm
# compute properties
computed_properties = self._fetch_corpus_properties(
name, key, observations=observations
)
# use computed vocabulary if not provided
if vocabulary is None:
vocabulary = computed_properties.vocabulary
# make properties public
self.properties = CorpusProperties(
vocabulary=vocabulary,
histogram=computed_properties.histogram
)
# validate properties
# http://unicode-search.net/unicode-namesearch.pl
if '^' in vocabulary:
raise ValueError('a eos char (^) was found in the vocabulary')
if '?' in vocabulary:
raise ValueError('a null char (?) was found in the vocabulary')
if '' in vocabulary:
raise ValueError('an invalid char () was found in the vocabulary')
# create encoding schema
self._setup_encoding()
# set language properties
self.source_lang = source_lang
self.target_lang = target_lang
# validate corpus properties
if validate:
self._validate_corpus_properties(name)
# setup tensorflow pipeline
super().__init__(histogram=self.properties.histogram,
dtype=self._encode_dtype,
name=name,
tqdm=tqdm,
**kwargs)
def word_vector_sum(
documents: Iterable[str],
vector_dict: Dict[str, np.array],
*,
separator: Optional[FrozenSet] = None,
lowercase: bool = True,
stop_words: Optional[Set] = None
):
"""
Calculate the word vector sum for each document in an iterable.
Args:
documents: An iterable of strings representing text
documents.
vector_dict(dict): A dictionary of words and their
corresponding vectors.
separator(frozenset): A frozenset of Unicode code points
to use as separators. Defaults to
``cypunct.unicode_classes.COMMON_SEPARATORS``.
lowercase(bool): If True, lowercase each document before
processing it.
stop_words(set): A set of words whose vectors should not
be included in the sum.
Returns:
(np.array): Return a numpy array containing a word
vector for each document, where each document's
word vector is the sum of its word vectors.
"""
# Dimension of the vectors we're using.
dimension = len(next(vector_dict.values().__iter__()))
# If every word in a document is either a stop word or
# not in the dictionary...
# then the list comprehension inside np.average will return
# []. For that case, we will insert an array of zeros.
is_empty = [np.zeros(dimension)]
if lowercase:
documents = (doc.lower() for doc in documents)
if stop_words:
return np.asarray([
np.average([
vector_dict[x]
for x in cypunct.split(doc, separator)
if x in vector_dict and
x not in stop_words
] or is_empty, axis=0)
for doc in documents
])
return np.asarray([
np.average([
vector_dict[x]
for x in cypunct.split(doc, separator)
if x in vector_dict
] or is_empty, axis=0)
for doc in documents
])
def propose_trades_for_partial_rebalancing(
self,
market_state: MarketState,
coins_to_rebalance: FrozenSet[str],
) -> List[AbstractTrade]:
"""TODO: Trade directly from X to Y without going through fiat.
"""
ideal_fiat_value_per_coin = self._ideal_fiat_value_per_coin(market_state)
est_values = market_state.estimate_values(market_state.balances, self.fiat)
# 1) Fan in to fiat, selling excess value in coins we want to rebalance
trades_to_fiat = []
for sell_coin in sorted(coins_to_rebalance):
if sell_coin == self.fiat:
continue
value = est_values.get(sell_coin, 0)
delta = value - ideal_fiat_value_per_coin
if delta > 0:
trades_to_fiat.append(
AbstractTrade(sell_coin, self.fiat, self.fiat, delta),
)
# 2) Simulate trades and estimate portfolio state afterwards
est_balances_after_trades = simulate_trades(
trades_to_fiat,
market_state,
)
est_values_after_trades = market_state.estimate_values(
est_balances_after_trades,
self.fiat,
)
fiat_after_trades = est_balances_after_trades[self.fiat]
fiat_to_redistribute = fiat_after_trades - ideal_fiat_value_per_coin
if fiat_to_redistribute <= 0:
return trades_to_fiat
# 3) Find coins in which we don't hold enough value
possible_buys = set()
for buy_coin in self._possible_investments(market_state):
value = est_values_after_trades.get(buy_coin, 0)
if ideal_fiat_value_per_coin > value:
possible_buys.add(buy_coin)
fiat_to_redistribute_per_coin = fiat_to_redistribute / len(possible_buys)
# 4) Plan trades, fanning back out from fiat to others
trades_from_fiat = []
for buy_coin in sorted(possible_buys):
value = est_values_after_trades.get(buy_coin, 0)
delta = ideal_fiat_value_per_coin - value
if delta > 0:
available_fiat = min(fiat_to_redistribute_per_coin, delta)
trades_from_fiat.append(
AbstractTrade(self.fiat, buy_coin, self.fiat, available_fiat),
)
return trades_to_fiat + trades_from_fiat