def test_do():
@attr.s
class EDo:
arg = attr.ib()
@do
def do_func(a, b):
done_a = yield Effect(EDo(a))
done_b = yield Effect(EDo(b))
return [done_a, done_b]
effect = do_func(1, 2)
assert isinstance(effect, Effect)
assert isinstance(effect.intent, ChainedIntent)
dispatcher = TypeDispatcher({
EDo: lambda intent: 'done: %s' % intent.arg
})
ret = sync_perform(dispatcher, effect)
assert ret == ['done: 1', 'done: 2']
python类ib()的实例源码
def test_chained_intent(self):
@attr.s
class ENumToString:
num = attr.ib()
def collect_intent_results():
intent_results = []
for i in range(5):
res = yield Effect(ENumToString(i))
intent_results.append(res)
return ''.join(intent_results)
effect = Effect(ChainedIntent(collect_intent_results()))
dispatcher = TypeDispatcher({
ENumToString: lambda intent: str(intent.num)
})
ret = sync_perform(dispatcher, effect)
assert ret == '01234'
def test_chained_intent(self):
@attr.s
class ENumToString:
num = attr.ib()
def collect_intent_results():
intent_results = []
for i in range(5):
res = yield Effect(ENumToString(i))
intent_results.append(res)
return ''.join(intent_results)
effect = Effect(ChainedIntent(collect_intent_results()))
dispatcher = TypeDispatcher({
ENumToString: lambda intent: str(intent.num)
})
ret = await asyncio_perform(dispatcher, effect)
assert ret == '01234'
def test_optional_field_roundtrip(converter, cl_and_vals):
"""
Classes with optional fields can be unstructured and structured.
"""
cl, vals = cl_and_vals
@attr.s
class C(object):
a = attr.ib(type=Optional[cl])
inst = C(a=cl(*vals))
assert inst == converter.structure(converter.unstructure(inst), C)
inst = C(a=None)
unstructured = converter.unstructure(inst)
assert inst == converter.structure(unstructured, C)
def __init_subclass__(cls, class_id, method_id, response_to=None,
followed_by_content=False, closing=False, **kwargs):
super().__init_subclass__(**kwargs)
cls.struct = make_struct(cls)
cls.closing = closing
cls.followed_by_content = followed_by_content
if followed_by_content:
cls.content = attr.ib(default=None)
cls.responses = []
cls.response_to = response_to
if response_to is not None:
response_to.responses.append(cls)
cls.class_id = attr.ib(default=class_id, init=False)
cls.method_id = attr.ib(default=method_id, init=False)
cls.channel_id = attr.ib(default=None, init=False)
cls.BY_ID[(class_id, method_id)] = cls
def attr_struct(cls=None, vals_to_attrs=False, defaults=..., **kws):
if not cls:
return ft.partial( attr_struct,
vals_to_attrs=vals_to_attrs, defaults=defaults, **kws )
try:
keys = cls.keys
del cls.keys
except AttributeError: keys = list()
else:
attr_kws = dict()
if defaults is not ...: attr_kws['default'] = defaults
if isinstance(keys, str): keys = keys.split()
for k in keys: setattr(cls, k, attr.ib(**attr_kws))
if vals_to_attrs:
for k, v in vars(cls).items():
if k.startswith('_') or k in keys or callable(v): continue
setattr(cls, k, attr.ib(v))
kws.setdefault('hash', not hasattr(cls, '__hash__'))
kws.setdefault('slots', True)
return attr.s(cls, **kws)
def test_secret_str_censors(self):
"""
_SecretStr censors it's __repr__ if its called from another __repr__.
"""
s = _SecretStr("abc")
@attr.s
class Cfg(object):
s = attr.ib()
assert "Cfg(s=<SECRET>)" == repr(Cfg(s))
def test_tolerates_attribs(self):
"""
Classes are allowed to have plain attr.ibs for e.g.
__attrs_post_init__.
"""
@environ.config
class Cfg(object):
e = environ.var()
x = attr.ib(default=42)
cfg = environ.to_config(Cfg, environ={
"APP_E": "e",
})
assert Cfg("e", 42) == cfg
def var(default=RAISE, convert=None, name=None, validator=None):
return attr.ib(
default=default,
metadata={CNF_KEY: _ConfigEntry(name, default, None)},
convert=convert,
validator=validator,
)
def group(cls):
return attr.ib(
default=None,
metadata={CNF_KEY: _ConfigEntry(None, None, cls, True)}
)
def secret(self, default=RAISE, convert=None, name=None, section=None):
if section is None:
section = self.section
return attr.ib(
default=default,
metadata={
CNF_KEY: _ConfigEntry(name, default, None, self._get),
CNF_INI_SECRET_KEY: _INIConfig(section),
},
convert=convert,
)
def test_AsyncResource_defaults():
@attr.s
class MyAR(tabc.AsyncResource):
record = attr.ib(default=attr.Factory(list))
async def aclose(self):
self.record.append("ac")
async with MyAR() as myar:
assert isinstance(myar, MyAR)
assert myar.record == []
assert myar.record == ["ac"]
def test_base(self):
@attr.s
class EDoSomething:
arg = attr.ib()
dispatcher = TypeDispatcher({
EDoSomething: lambda intent: 'bar'
})
effect = Effect(EDoSomething('foo'))
ret = sync_perform(dispatcher, effect)
assert ret == 'bar'
def test_base(self):
@attr.s
class EDoSomething:
arg = attr.ib()
dispatcher = TypeDispatcher({
EDoSomething: lambda intent: 'bar'
})
effect = Effect(EDoSomething('foo'))
ret = await asyncio_perform(dispatcher, effect)
assert ret == 'bar'
def test_edge_errors():
"""Edge input cases cause errors."""
@attr.s
class A(object):
pass
with pytest.raises(ValueError):
# Can't generate for only one class.
create_uniq_field_dis_func(A)
@attr.s
class B(object):
pass
with pytest.raises(ValueError):
# No fields on either class.
create_uniq_field_dis_func(A, B)
@attr.s
class C(object):
a = attr.ib()
@attr.s
class D(object):
a = attr.ib()
with pytest.raises(ValueError):
# No unique fields on either class.
create_uniq_field_dis_func(C, D)
def test_union_field_roundtrip(converter, cl_and_vals_a, cl_and_vals_b, strat):
"""
Classes with union fields can be unstructured and structured.
"""
converter.unstruct_strat = strat
cl_a, vals_a = cl_and_vals_a
cl_b, vals_b = cl_and_vals_b
a_field_names = {a.name for a in fields(cl_a)}
b_field_names = {a.name for a in fields(cl_b)}
assume(a_field_names)
assume(b_field_names)
common_names = a_field_names & b_field_names
assume(len(a_field_names) > len(common_names))
@attr.s
class C(object):
a = attr.ib(type=Union[cl_a, cl_b])
inst = C(a=cl_a(*vals_a))
if strat is UnstructureStrategy.AS_DICT:
assert inst == converter.structure(converter.unstructure(inst), C)
else:
# Our disambiguation functions only support dictionaries for now.
with pytest.raises(ValueError):
converter.structure(converter.unstructure(inst), C)
def handler(obj, _):
return converter.structure(obj, cl_a)
converter._union_registry[Union[cl_a, cl_b]] = handler
assert inst == converter.structure(converter.unstructure(inst), C)
del converter._union_registry[Union[cl_a, cl_b]]
def bare_typed_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields values
appropriate for that attribute.
"""
default = attr.NOTHING
if defaults is True or (defaults is None and draw(booleans())):
default = None
return ((attr.ib(type=Any, default=default), just(None)))
def int_typed_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields ints for that
attribute.
"""
default = attr.NOTHING
if defaults is True or (defaults is None and draw(booleans())):
default = draw(integers())
return ((attr.ib(type=int, default=default), integers()))
def float_typed_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields floats for that
attribute.
"""
default = attr.NOTHING
if defaults is True or (defaults is None and draw(booleans())):
default = draw(floats())
return ((attr.ib(type=float, default=default), floats()))
def dict_typed_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields dictionaries
for that attribute. The dictionaries map strings to integers.
"""
default = attr.NOTHING
val_strat = dictionaries(keys=text(), values=integers())
if defaults is True or (defaults is None and draw(booleans())):
default = draw(val_strat)
return ((attr.ib(type=Dict[unicode, int], default=default), val_strat))
def just_class(tup):
# tup: Tuple[List[Tuple[_CountingAttr, Strategy]],
# Tuple[Type, Sequence[Any]]]
nested_cl = tup[1][0]
nested_cl_args = tup[1][1]
default = attr.Factory(lambda: nested_cl(*nested_cl_args))
combined_attrs = list(tup[0])
combined_attrs.append((attr.ib(type=nested_cl, default=default),
just(nested_cl(*nested_cl_args))))
return _create_hyp_class(combined_attrs)
def list_of_class(tup):
nested_cl = tup[1][0]
nested_cl_args = tup[1][1]
default = attr.Factory(lambda: [nested_cl(*nested_cl_args)])
combined_attrs = list(tup[0])
combined_attrs.append((attr.ib(type=List[nested_cl], default=default),
just([nested_cl(*nested_cl_args)])))
return _create_hyp_class(combined_attrs)
def dict_of_class(tup):
nested_cl = tup[1][0]
nested_cl_args = tup[1][1]
default = attr.Factory(lambda: {"cls": nested_cl(*nested_cl_args)})
combined_attrs = list(tup[0])
combined_attrs.append((attr.ib(type=Dict[str, nested_cl], default=default),
just({'cls': nested_cl(*nested_cl_args)})))
return _create_hyp_class(combined_attrs)
def list_of_class(tup):
nested_cl = tup[1][0]
default = attr.Factory(lambda: [nested_cl()])
combined_attrs = list(tup[0])
combined_attrs.append((attr.ib(default=default),
st.just([nested_cl()])))
return _create_hyp_class(combined_attrs)
def dict_of_class(tup):
nested_cl = tup[1][0]
default = attr.Factory(lambda: {"cls": nested_cl()})
combined_attrs = list(tup[0])
combined_attrs.append((attr.ib(default=default),
st.just({'cls': nested_cl()})))
return _create_hyp_class(combined_attrs)
def bare_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields values
appropriate for that attribute.
"""
default = NOTHING
if defaults is True or (defaults is None and draw(st.booleans())):
default = None
return ((attr.ib(default=default), st.just(None)))
def int_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields ints for that
attribute.
"""
default = NOTHING
if defaults is True or (defaults is None and draw(st.booleans())):
default = draw(st.integers())
return ((attr.ib(default=default), st.integers()))
def str_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields strs for that
attribute.
"""
default = NOTHING
if defaults is True or (defaults is None and draw(st.booleans())):
default = draw(st.text())
return ((attr.ib(default=default), st.text()))
def dict_attrs(draw, defaults=None):
"""
Generate a tuple of an attribute and a strategy that yields dictionaries
for that attribute. The dictionaries map strings to integers.
"""
default = NOTHING
val_strat = st.dictionaries(keys=st.text(), values=st.integers())
if defaults is True or (defaults is None and draw(st.booleans())):
default_val = draw(val_strat)
default = attr.Factory(lambda: default_val)
return ((attr.ib(default=default), val_strat))
def generate_fake_bugspots(hotspot_files):
@attr.s
class BugspotsResult:
filename = attr.ib(convert=str)
class FakeBugspots:
def __init__(self, *args, **kwargs):
pass
def get_hotspots(self):
return [BugspotsResult(file) for file in hotspot_files]
return FakeBugspots