def test_worker_alarm(manager):
called = []
def handler(signal, frame):
called.append(True)
signal.signal(signal.SIGALRM, handler)
@manager.task
def foo(sleep):
time.sleep(sleep)
w = Worker(manager, task_timeout=1)
w.process_one(make_task('foo', args=(0.1,)))
assert not called
w.process_one(make_task('foo', args=(1.1,)))
assert called
评论列表
文章目录