def test_simple_descriptor_trigger(self):
class Foo(ModuleBase):
def __init__(self, s):
super().__init__()
self.bar = None
self.s = s
trigger = DescriptorClassTrigger(TriggerBase)
@activity(trigger)
async def activity(self):
self.bar = "qwertyuiop"
s.release()
s = Semaphore(0)
foo = Foo(s)
asyncio.run_coroutine_threadsafe(foo.trigger.trigger(), self.loop)
self.assertTrue(s.acquire(timeout=0.1))
self.assertEqual(foo.bar, "qwertyuiop")
评论列表
文章目录