test_manager.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:dsq 作者: baverman 项目源码 文件源码
def test_worker_alarm(manager):
    called = []
    def handler(signal, frame):
        called.append(True)
    signal.signal(signal.SIGALRM, handler)

    @manager.task
    def foo(sleep):
        time.sleep(sleep)

    w = Worker(manager, task_timeout=1)
    w.process_one(make_task('foo', args=(0.1,)))
    assert not called

    w.process_one(make_task('foo', args=(1.1,)))
    assert called
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号