def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
python类get_current_revision()的实例源码
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
if self.as_sql:
return self._start_from_rev
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
self._version.create(self.connection, checkfirst=True)
return self.connection.scalar(self._version.select())
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
if self.as_sql:
return self._start_from_rev
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
self._version.create(self.connection, checkfirst=True)
return self.connection.scalar(self._version.select())
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_revision(self):
"""Return the current revision, usually that which is present
in the ``alembic_version`` table in the database.
This method intends to be used only for a migration stream that
does not contain unmerged branches in the target database;
if there are multiple branches present, an exception is raised.
The :meth:`.MigrationContext.get_current_heads` should be preferred
over this method going forward in order to be compatible with
branch migration support.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned instead, if any.
"""
heads = self.get_current_heads()
if len(heads) == 0:
return None
elif len(heads) > 1:
raise util.CommandError(
"Version table '%s' has more than one head present; "
"please use get_current_heads()" % self.version_table)
else:
return heads[0]
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev == 'base':
start_from_rev = None
elif start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev == 'base':
start_from_rev = None
elif start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev == 'base':
start_from_rev = None
elif start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev == 'base':
start_from_rev = None
elif start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev == 'base':
start_from_rev = None
elif start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def get_current_heads(self):
"""Return a tuple of the current 'head versions' that are represented
in the target database.
For a migration stream without branches, this will be a single
value, synonymous with that of
:meth:`.MigrationContext.get_current_revision`. However when multiple
unmerged branches exist within the target database, the returned tuple
will contain a value for each head.
If this :class:`.MigrationContext` was configured in "offline"
mode, that is with ``as_sql=True``, the ``starting_rev``
parameter is returned in a one-length tuple.
If no version table is present, or if there are no revisions
present, an empty tuple is returned.
.. versionadded:: 0.7.0
"""
if self.as_sql:
start_from_rev = self._start_from_rev
if start_from_rev is not None and self.script:
start_from_rev = \
self.script.get_revision(start_from_rev).revision
return util.to_tuple(start_from_rev, default=())
else:
if self._start_from_rev:
raise util.CommandError(
"Can't specify current_rev to context "
"when using a database connection")
if not self._has_version_table():
return ()
return tuple(
row[0] for row in self.connection.execute(self._version.select())
)
def run_migrations(self, **kw):
"""Run the migration scripts established for this :class:`.MigrationContext`,
if any.
The commands in :mod:`alembic.command` will set up a function
that is ultimately passed to the :class:`.MigrationContext`
as the ``fn`` argument. This function represents the "work"
that will be done when :meth:`.MigrationContext.run_migrations`
is called, typically from within the ``env.py`` script of the
migration environment. The "work function" then provides an iterable
of version callables and other version information which
in the case of the ``upgrade`` or ``downgrade`` commands are the
list of version scripts to invoke. Other commands yield nothing,
in the case that a command wants to run some other operation
against the database such as the ``current`` or ``stamp`` commands.
:param \**kw: keyword arguments here will be passed to each
migration callable, that is the ``upgrade()`` or ``downgrade()``
method within revision scripts.
"""
current_rev = rev = False
self.impl.start_migrations()
for change, prev_rev, rev, doc in self._migrations_fn(
self.get_current_revision(),
self):
if current_rev is False:
current_rev = prev_rev
if self.as_sql and not current_rev:
self._version.create(self.connection)
if doc:
log.info("Running %s %s -> %s, %s", change.__name__, prev_rev,
rev, doc)
else:
log.info("Running %s %s -> %s", change.__name__, prev_rev, rev)
if self.as_sql:
self.impl.static_output(
"-- Running %s %s -> %s" %
(change.__name__, prev_rev, rev)
)
change(**kw)
if not self.impl.transactional_ddl:
self._update_current_rev(prev_rev, rev)
prev_rev = rev
if rev is not False:
if self.impl.transactional_ddl:
self._update_current_rev(current_rev, rev)
if self.as_sql and not rev:
self._version.drop(self.connection)
def run_migrations(self, **kw):
"""Run the migration scripts established for this :class:`.MigrationContext`,
if any.
The commands in :mod:`alembic.command` will set up a function
that is ultimately passed to the :class:`.MigrationContext`
as the ``fn`` argument. This function represents the "work"
that will be done when :meth:`.MigrationContext.run_migrations`
is called, typically from within the ``env.py`` script of the
migration environment. The "work function" then provides an iterable
of version callables and other version information which
in the case of the ``upgrade`` or ``downgrade`` commands are the
list of version scripts to invoke. Other commands yield nothing,
in the case that a command wants to run some other operation
against the database such as the ``current`` or ``stamp`` commands.
:param \**kw: keyword arguments here will be passed to each
migration callable, that is the ``upgrade()`` or ``downgrade()``
method within revision scripts.
"""
current_rev = rev = False
self.impl.start_migrations()
for change, prev_rev, rev, doc in self._migrations_fn(
self.get_current_revision(),
self):
if current_rev is False:
current_rev = prev_rev
if self.as_sql and not current_rev:
self._version.create(self.connection)
if doc:
log.info("Running %s %s -> %s, %s", change.__name__, prev_rev,
rev, doc)
else:
log.info("Running %s %s -> %s", change.__name__, prev_rev, rev)
if self.as_sql:
self.impl.static_output(
"-- Running %s %s -> %s" %
(change.__name__, prev_rev, rev)
)
change(**kw)
if not self.impl.transactional_ddl:
self._update_current_rev(prev_rev, rev)
prev_rev = rev
if rev is not False:
if self.impl.transactional_ddl:
self._update_current_rev(current_rev, rev)
if self.as_sql and not rev:
self._version.drop(self.connection)