def import_all_modules(
module_name: str, path: typing.Optional[typing.Sequence[str]]=None,
) -> typing.AbstractSet[str]:
"""Import all modules. Maybe it is useful when populate revision script
in alembic with ``--autogenerate`` option. Since alembic can only detect a
change/creation of an object that imported in runtime, importing all
modules helps entity to track in migration script properly.
.. code-block:
>>> from ormeasy.common import import_all_modules
>>> import_all_modules('ormeasy')
:param str module_name: The module name want to import.
:param list[str] or None path: The path to find the root module.
"""
modules = get_all_modules(module_name, path)
for module_name in modules:
importlib.import_module(module_name)
return modules
python类AbstractSet()的实例源码
def validate_type(data, type_):
instance_check = False
abstract_types = {typing.AbstractSet, typing.Sequence, typing.Mapping}
if hasattr(type_, '__origin__') and type_.__origin__ in abstract_types:
param_type = get_abstract_param_types(type_)
imp_types = {
typing.AbstractSet: collections.Set,
typing.Sequence: collections.Sequence,
typing.Mapping: collections.Mapping,
}
instance_check = isinstance(data, imp_types[type_.__origin__]) and \
all(isinstance(item, param_type[0]) for item in data)
else:
try:
instance_check = isinstance(data, type_)
except TypeError:
if is_union_type(type_):
instance_check = any(
isinstance(data, t) for t in get_union_types(type_)
)
else:
raise ValueError('{!r} cannot validated.'.format(type_))
return instance_check
def deserialize_abstract_type(cls, data):
abstract_type_map = {
typing.Sequence: list,
typing.List: list,
typing.Dict: dict,
typing.Set: set,
typing.AbstractSet: set,
}
cls_origin_type = cls.__origin__
if cls_origin_type is None:
cls_origin_type = cls
iterable_types = {
typing.Sequence, typing.List, typing.Tuple, typing.Set,
typing.AbstractSet, typing.Mapping,
}
if cls_origin_type in iterable_types:
return deserialize_iterable_abstract_type(cls, cls_origin_type, data)
else:
return abstract_type_map[cls_origin_type](data)
def provides(self) -> AbstractSet[Type]:
return self._sources.keys()
def accepts(self) -> AbstractSet[Type]:
return self._sinks.keys()
def test_validate_abstract_set():
assert validate_type({1, 2, 3}, typing.AbstractSet[int])
assert validate_type(frozenset([1, 2, 3]), typing.AbstractSet[int])
def is_support_abstract_type(t):
"""FIXME: 3.5 only"""
if hasattr(t, '__origin__') and t.__origin__:
data_type = t.__origin__
else:
data_type = t
abstract_types = {
typing.Sequence,
typing.List,
typing.Set,
typing.AbstractSet,
typing.Mapping,
typing.Dict,
}
return any(type_ is data_type for type_ in abstract_types)
def usernames(self) -> AbstractSet[str]:
"""Return an iterable with all of the contributors' usernames."""
pull_request = self.request['pull_request']
# Start with the author of the pull request.
logins = {pull_request['user']['login']}
# For each commit, get the author and committer.
async for commit in self._gh.getiter(pull_request['commits_url']):
author = commit['author']
# When the author is missing there seems to typically be a
# matching commit that **does** specify the author. (issue #56)
if author is not None:
author_login = author['login']
if commit['commit']['author']['email'].lower() == GITHUB_EMAIL:
self.server.log("Ignoring GitHub-managed username: "
+ author_login)
else:
logins.add(author_login)
committer = commit['committer']
if committer is not None:
committer_login = committer['login']
if commit['commit']['committer']['email'].lower() == GITHUB_EMAIL:
self.server.log("Ignoring GitHub-managed username: "
+ committer_login)
else:
logins.add(committer_login)
return frozenset(logins)
def trusted_users(self) -> AbstractSet[str]:
"""Return a list of trusted users.
Trusted users will not be checked for CLA.
"""
return frozenset()
def usernames(self) -> AbstractSet[str]:
"""Return an iterable of all the contributors' usernames."""
return frozenset() # pragma: no cover
def check(self, client: aiohttp.ClientSession,
usernames: AbstractSet[str]) -> Status:
"""Check if all of the specified usernames have signed the CLA."""
# While it would technically share more specific information if a
# mapping of {username: Status} was returned, the vast majority of
# cases will be for a single user and thus not worth the added
# complexity to need to worry about it.
return Status.username_not_found # pragma: no cover
def check_variable_bindings(self, bound_variables: AbstractSet[Variable]) -> None:
if self._variable not in bound_variables:
raise UndefinedNameError('Undefined variable {0!s} is being accessed'.format(self._variable))
def pytest_assertrepr_compare(op: str, left, right) -> Optional[Sequence[str]]:
# set of entities
if op == '==' and isinstance(left, (set, frozenset)) and \
isinstance(right, (set, frozenset)) and \
all(isinstance(v, Entity) for v in left) and \
all(isinstance(v, Entity) for v in right):
def repr_ids(ids: AbstractSet[EntityId]) -> str:
sorted_ids = sorted(
ids,
key=lambda i: (
i[0],
# Since EntityIds usually consist of one letter followed
# by digits, order them numerically. If it's not in
# that format they should be sorted in the other bucket.
(0, int(i[1:])) if i[1:].isdigit() else (1, i[1:])
)
)
return '{' + ', '.join(sorted_ids) + '}'
left = cast(Union[Set[Entity], FrozenSet[Entity]], left)
right = cast(Union[Set[Entity], FrozenSet[Entity]], right)
left_ids = {e.id for e in left}
right_ids = {e.id for e in right}
return [
'{} == {}'.format(repr_ids(left_ids), repr_ids(right_ids)),
'Extra entities in the left set:',
repr_ids(left_ids - right_ids),
'Extra entities in the right set:',
repr_ids(right_ids - left_ids),
]
return None
def iterators(ps, qs, rs, ts):
"""
:type ps: typing.Iterable[int]
:type qs: typing.Iterator[str]
:type rs: typing.Sequence["ForwardReference"]
:type ts: typing.AbstractSet["float"]
"""
for p in ps:
#? int()
p
#?
next(ps)
a, b = ps
#? int()
a
##? int() --- TODO fix support for tuple assignment
# https://github.com/davidhalter/jedi/pull/663#issuecomment-172317854
# test below is just to make sure that in case it gets fixed by accident
# these tests will be fixed as well the way they should be
#?
b
for q in qs:
#? str()
q
#? str()
next(qs)
for r in rs:
#? ForwardReference()
r
#?
next(rs)
for t in ts:
#? float()
t
def sets(p, q):
"""
:type p: typing.AbstractSet[int]
:type q: typing.MutableSet[float]
"""
#? []
p.a
#? ["add"]
q.a
def get_all_modules(
module_name: str, path: typing.Optional[typing.Sequence[str]]=None,
) -> typing.AbstractSet[str]:
"""Find all module names from given ``module_name``.
:param str module_name: The name of root module which want to find.
:param list[str] or None path: The path to find the root module.
:return: The set of module names.
.. code-block:: python
>>> get_all_modules('ormeasy')
{'ormeasy.alembic', 'ormeasy.common', 'ormeasy.sqlalchemy'}
>>> get_all_modules('ormeasy.common')
{'ormeasy.common'}
"""
root_mod, *_ = module_name.split('.')
module_spec = importlib.machinery.PathFinder.find_spec(root_mod, path)
if not module_spec:
if path:
raise ValueError(
'{!s} inexists or is not a python module in {!s}'.format(
root_mod, path
)
)
raise ValueError(
'{!s} inexists or is not a python module'.format(root_mod)
)
module_name_with_dot = root_mod + '.'
if module_spec.submodule_search_locations:
module_names = {
name
for _, name, _ in pkgutil.walk_packages(
module_spec.submodule_search_locations,
prefix=module_name_with_dot
)
}
else:
module_names = {
name
for _, name, _ in pkgutil.walk_packages(
[os.path.dirname(module_spec.origin)]
)
if name.startswith(module_name_with_dot) or name == module_name
}
return {m for m in module_names if m.startswith(module_name)}
def deserialize_iterable_abstract_type(cls, cls_origin_type, data):
abstract_type_map = {
typing.Sequence: list,
typing.List: list,
typing.Set: set,
typing.AbstractSet: set,
typing.Mapping: Map,
}
deserialized_data = data
cls_primitive_type = abstract_type_map[cls_origin_type]
# Whereas on Python/typing < 3.5.2 type parameters are stored in
# __parameters__ attribute, on Python/typing >= 3.5.2 __parameters__
# attribute is gone and __args__ comes instead.
type_params = (cls.__args__
if hasattr(cls, '__args__')
else cls.__parameters__)
if len(type_params) == 1:
elem_type, = type_params
if isinstance(elem_type, typing.TypeVar):
deserialized_data = cls_primitive_type(data)
else:
deserialized_data = cls_primitive_type(
deserialize_meta(elem_type, d) for d in data
)
elif len(type_params) == 2:
# Key-value
key_type, value_type = type_params
assert not (isinstance(key_type, typing.TypeVar) or
isinstance(value_type, typing.TypeVar))
if not isinstance(data, collections.Sequence):
raise ValueError('map must be an array of item objects e.g. '
'[{"key": ..., "value": ...}, ...]')
def parse_pair(pair):
if not isinstance(pair, collections.Mapping):
raise ValueError('map item must be a JSON object')
try:
key = pair['key']
value = pair['value']
except KeyError:
raise ValueError('map item must consist of "key" and "value" '
'fields e.g. {"key": ..., "value": ...}')
return (
deserialize_meta(key_type, key),
deserialize_meta(value_type, value),
)
deserialized_data = cls_primitive_type(map(parse_pair, data))
return deserialized_data