def test_create_slot(slot):
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=psycopg2.errorcodes.DUPLICATE_OBJECT):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=-1):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
with pytest.raises(psycopg2.ProgrammingError) as e_info:
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
assert e_info.value.pgcode == -1
slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
with pytest.raises(Exception):
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
评论列表
文章目录