def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
python类LogicalReplicationConnection()的实例源码
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def create_connection(self, async_=True):
logger.info('connecting to source database at "%s"', self.dsn)
cnn = psycopg2.connect(
self.dsn, async_=async_,
connection_factory=LogicalReplicationConnection)
wait_select(cnn)
return cnn
def make_repl_conn(self, **kwargs):
"""Create a new replication connection to the test database.
The connection is asynchronous, and will be closed on teardown().
"""
cnn = psycopg2.connect(
self.dsn, connection_factory=LogicalReplicationConnection,
async_=True, **kwargs)
wait_select(cnn)
self._conns.append(cnn)
return cnn