def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
test_replication.py 文件源码
python
阅读 38
收藏 0
点赞 0
评论 0
评论列表
文章目录