def post_migrate_mpathnode(model):
# Note: model *isn't* a subclass of MPathNode, because django migrations are Weird.
# if not issubclass(model, MPathNode):
# Hence the following workaround:
try:
ltree_field = model._meta.get_field('ltree')
if not isinstance(ltree_field, LTreeField):
return
except FieldDoesNotExist:
return
names = {
"table": quote_ident(model._meta.db_table, connection.connection),
"check_constraint": quote_ident('%s__check_ltree' % model._meta.db_table, connection.connection),
}
cur = connection.cursor()
# Check that the ltree is always consistent with being a child of _parent
cur.execute('''
ALTER TABLE %(table)s ADD CONSTRAINT %(check_constraint)s CHECK (
(parent_id IS NOT NULL AND ltree ~ (parent_id::text || '.*{1}')::lquery)
OR (parent_id IS NULL AND ltree ~ '*{1}'::lquery)
)
''' % names)
python类quote_ident()的实例源码
def test_identifier(self):
from psycopg2.extensions import quote_ident
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
snowman = "\u2603"
quoted = '"' + snowman + '"'
if sys.version_info[0] < 3:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else:
self.assertEqual(quote_ident(snowman, self.conn), quoted)
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError(
"output plugin name is required to create "
"logical replication slot")
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError(
"cannot specify output plugin name when creating "
"physical replication slot")
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)
def test_identifier(self):
from psycopg2.extensions import quote_ident
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
snowman = u"\u2603"
quoted = '"' + snowman + '"'
if sys.version_info[0] < 3:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else:
self.assertEqual(quote_ident(snowman, self.conn), quoted)
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError(
"output plugin name is required to create "
"logical replication slot")
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError(
"cannot specify output plugin name when creating "
"physical replication slot")
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)
def test_identifier(self):
from psycopg2.extensions import quote_ident
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
snowman = "\u2603"
quoted = '"' + snowman + '"'
if sys.version_info[0] < 3:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else:
self.assertEqual(quote_ident(snowman, self.conn), quoted)
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError(
"output plugin name is required to create "
"logical replication slot")
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError(
"cannot specify output plugin name when creating "
"physical replication slot")
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)
def test_identifier(self):
from psycopg2.extensions import quote_ident
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
snowman = u"\u2603"
quoted = '"' + snowman + '"'
if sys.version_info[0] < 3:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else:
self.assertEqual(quote_ident(snowman, self.conn), quoted)
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError(
"output plugin name is required to create "
"logical replication slot")
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError(
"cannot specify output plugin name when creating "
"physical replication slot")
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)
def test_identifier(self):
from psycopg2.extensions import quote_ident
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
snowman = u"\u2603"
quoted = '"' + snowman + '"'
if sys.version_info[0] < 3:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else:
self.assertEqual(quote_ident(snowman, self.conn), quoted)
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError(
"output plugin name is required to create "
"logical replication slot")
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError(
"cannot specify output plugin name when creating "
"physical replication slot")
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.items():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.items():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError(
"slot name is required for logical replication")
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError(
"unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError(
"cannot specify timeline for logical replication")
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError(
"cannot specify output plugin options for physical replication")
command += " ("
for k, v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly