def test_delete_slot(slot):
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=-1):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
with pytest.raises(psycopg2.ProgrammingError) as e_info:
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
assert e_info.value.pgcode == -1
slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
with pytest.raises(Exception):
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
评论列表
文章目录