def test_enqueue_call_with_key(producer, logger):
producer_cls, producer_inst = producer
queue = Queue(hosts='host:7000', topic='foo', timeout=300)
job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5])
assert isinstance(job, Job)
assert isinstance(job.id, str)
assert isinstance(job.timestamp, int)
assert job.topic == 'foo'
assert job.func == success_func
assert job.args == (1, 2)
assert job.kwargs == {'c': [3, 4, 5]}
assert job.timeout == 300
assert job.key == 'bar'
producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar')
logger.info.assert_called_once_with('Enqueued: {}'.format(job))
评论列表
文章目录