test_sender.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:iris 作者: linkedin 项目源码 文件源码
def test_fetch_and_prepare_message(mocker):
    mocker.patch('iris.bin.sender.message_send_enqueue')
    from iris.bin.sender import (
        fetch_and_prepare_message, message_queue, per_mode_send_queues
    )

    init_queue_with_item(message_queue, {'message_id': 1234, 'plan_id': None})
    fetch_and_prepare_message()
    assert message_queue.qsize() == 0

    send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue())

    init_queue_with_item(send_queue, {'message_id': 1234, 'plan_id': None})

    assert message_queue.qsize() == 0
    assert send_queue.qsize() == 1
    m = send_queue.get()
    assert m['message_id'] == 1234
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号