test_sender.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:iris 作者: linkedin 项目源码 文件源码
def test_handle_api_request_v0_send(mocker):
    from iris.bin.sender import message_send_enqueue
    from iris.sender.rpc import handle_api_request, send_funcs
    from iris.sender.shared import per_mode_send_queues

    send_funcs['message_send_enqueue'] = message_send_enqueue

    send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue())

    # support expanding target
    mocker.patch('iris.sender.cache.targets_for_role', lambda role, target: [target])
    mocker.patch('iris.bin.sender.db')
    mocker.patch('iris.metrics.stats')
    mocker.patch('iris.bin.sender.set_target_contact').return_value = True

    mock_address = mocker.MagicMock()
    mock_socket = mocker.MagicMock()
    mock_socket.recv.return_value = msgpack.packb({
        'endpoint': 'v0/send',
        'data': fake_notification,
    })

    while send_queue.qsize() > 0:
        send_queue.get()

    handle_api_request(mock_socket, mock_address)

    assert send_queue.qsize() == 1
    m = send_queue.get()
    assert m['subject'] == '[%s] %s' % (fake_notification['application'],
                                        fake_notification['subject'])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号