def get_latest_snapshot_timestamp(self, statuses):
"""Select the latest timestamp of the completed snapshot.
Args:
statuses (tuple): The tuple of snapshot statuses to filter on.
Returns:
str: The string timestamp of the latest complete snapshot.
Raises:
MySQLError: When no rows are found.
"""
# Build a dynamic parameterized query string for filtering the
# snapshot statuses
if not isinstance(statuses, tuple):
statuses = ('SUCCESS',)
status_params = ','.join(['%s']*len(statuses))
filter_clause = SNAPSHOT_STATUS_FILTER_CLAUSE.format(status_params)
try:
cursor = self.conn.cursor()
cursor.execute(
select_data.LATEST_SNAPSHOT_TIMESTAMP + filter_clause, statuses)
row = cursor.fetchone()
if row:
return row[0]
raise NoResultsError('No snapshot cycle found.')
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError, NoResultsError) as e:
raise MySQLError('snapshot_cycles', e)
评论列表
文章目录