test_stream.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:pg2kinesis 作者: handshake 项目源码 文件源码
def test__init__():
    mock_client = Mock()
    with patch.object(boto3, 'client', return_value=mock_client):
        error_response = {'Error': {'Code': 'ResourceInUseException'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        StreamWriter('blah')
        assert mock_client.create_stream.called
        assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence"

        error_response = {'Error': {'Code': 'Something else'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        mock_client.reset_mock()
        with pytest.raises(ClientError):
            StreamWriter('blah')
            assert mock_client.create_stream.called
            assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号