def test__init__():
mock_client = Mock()
with patch.object(boto3, 'client', return_value=mock_client):
error_response = {'Error': {'Code': 'ResourceInUseException'}}
mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))
StreamWriter('blah')
assert mock_client.create_stream.called
assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence"
error_response = {'Error': {'Code': 'Something else'}}
mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))
mock_client.reset_mock()
with pytest.raises(ClientError):
StreamWriter('blah')
assert mock_client.create_stream.called
assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
评论列表
文章目录