def test_redis_delay_task_decorator_invalid_function(event_loop, redis_instance):
import logging
logger = logging.getLogger("aiotasks")
class CustomLogger(logging.StreamHandler):
def __init__(self):
super(CustomLogger, self).__init__()
self.content = []
def emit(self, record):
self.content.append(record.msg)
custom = CustomLogger()
logger.addHandler(custom)
manager = build_manager(dsn=redis_instance, loop=event_loop)
async def run():
# Send an invalid task name
task_id = uuid.uuid4().hex
await manager._redis_poller.lpush(manager.task_list_name,
msgpack.packb(dict(task_id=task_id,
function="non_exist",
args=(),
kwargs={})))
manager.run()
await manager.wait(timeout=0.2, exit_on_finish=False, wait_timeout=0.1)
event_loop.run_until_complete(run())
manager.stop()
assert "No local task with name 'non_exist'" in custom.content
评论列表
文章目录