test_slot.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:pg2kinesis 作者: handshake 项目源码 文件源码
def test_create_slot(slot):

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.DUPLICATE_OBJECT):
        pe = psycopg2.ProgrammingError()


        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        slot.create_slot()
        slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                     slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                     output_plugin=u'test_decoding')
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                          new_callable=PropertyMock,
                          return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)

        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.create_slot()
            slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                         slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                         output_plugin=u'test_decoding')
        assert e_info.value.pgcode == -1

        slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.create_slot()
        slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                         slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                         output_plugin=u'test_decoding')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号