test_queue.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:kq 作者: joowani 项目源码 文件源码
def test_enqueue_call_with_key(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)
    job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5])

    assert isinstance(job, Job)
    assert isinstance(job.id, str)
    assert isinstance(job.timestamp, int)
    assert job.topic == 'foo'
    assert job.func == success_func
    assert job.args == (1, 2)
    assert job.kwargs == {'c': [3, 4, 5]}
    assert job.timeout == 300
    assert job.key == 'bar'

    producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar')
    logger.info.assert_called_once_with('Enqueued: {}'.format(job))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号