def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
评论列表
文章目录