def select_group_ids(self, resource_name, timestamp):
"""Select the group ids from a snapshot table.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
Returns:
list: A list of group ids.
Raises:
MySQLError: When an error has occured while executing the query.
"""
try:
group_ids_sql = select_data.GROUP_IDS.format(timestamp)
cursor = self.conn.cursor(cursorclass=cursors.DictCursor)
cursor.execute(group_ids_sql)
rows = cursor.fetchall()
return [row['group_id'] for row in rows]
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
raise MySQLError(resource_name, e)
python类NotSupportedError()的实例源码
def execute_sql_with_fetch(self, resource_name, sql, values):
"""Executes a provided sql statement with fetch.
Args:
resource_name (str): String of the resource name.
sql (str): String of the sql statement.
values (tuple): Tuple of string for sql placeholder values.
Returns:
list: A list of dict representing rows of sql query result.
Raises:
MySQLError: When an error has occured while executing the query.
"""
try:
cursor = self.conn.cursor(cursorclass=cursors.DictCursor)
cursor.execute(sql, values)
return cursor.fetchall()
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
raise MySQLError(resource_name, e)
def execute_sql_with_commit(self, resource_name, sql, values):
"""Executes a provided sql statement with commit.
Args:
resource_name (str): String of the resource name.
sql (str): String of the sql statement.
values (tuple): Tuple of string for sql placeholder values.
Raises:
MySQLError: When an error has occured while executing the query.
"""
try:
cursor = self.conn.cursor()
cursor.execute(sql, values)
self.conn.commit()
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
raise MySQLError(resource_name, e)
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def get_buckets_acls(self, resource_name, timestamp):
"""Select the bucket acls from a bucket acls snapshot table.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
Returns:
list: List of bucket acls.
Raises:
MySQLError: An error with MySQL has occurred.
"""
bucket_acls = {}
cnt = 0
try:
bucket_acls_sql = select_data.BUCKET_ACLS.format(timestamp)
rows = self.execute_sql_with_fetch(resource_name,
bucket_acls_sql,
None)
for row in rows:
bucket_acl = bkt_acls.\
BucketAccessControls(bucket=row['bucket'],
entity=row['entity'],
email=row['email'],
domain=row['domain'],
role=row['role'],
project_number=row['project_number'])
bucket_acls[cnt] = bucket_acl
cnt += 1
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
LOGGER.error(errors.MySQLError(resource_name, e))
return bucket_acls
def get_bigquery_acls(self, resource_name, timestamp):
"""Select the Big Query acls from a Big Query acls snapshot table.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
Returns:
dict: Dictionary keyed by the count of ACLs and then the ACLs.
Raises:
MySQLError: An error with MySQL has occurred.
"""
bigquery_acls = {}
cnt = 0
try:
bigquery_acls_sql = select_data.BIGQUERY_ACLS.format(timestamp)
rows = self.execute_sql_with_fetch(resource_name,
bigquery_acls_sql,
None)
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
LOGGER.error(errors.MySQLError(resource_name, e))
for row in rows:
bigquery_acl = bq_acls.BigqueryAccessControls(
dataset_id=row['dataset_id'],
special_group=row['access_special_group'],
user_email=row['access_user_by_email'],
domain=row['access_domain'],
role=row['role'],
group_email=row['access_group_by_email'],
project_id=row['project_id'])
bigquery_acls[cnt] = bigquery_acl
cnt += 1
return bigquery_acls
def _get_cloudsql_instance_acl_map(self, resource_name, timestamp):
"""Create CloudSQL instance acl map.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
Returns:
dict: Map of instance acls.
Raises:
MySQLError: An error with MySQL has occurred.
"""
cloudsql_acls_map = {}
try:
cloudsql_acls_sql = select_data.CLOUDSQL_ACLS.format(timestamp)
rows = self.execute_sql_with_fetch(resource_name,
cloudsql_acls_sql,
None)
for row in rows:
acl_list = []
project_number = row['project_number']
instance_name = row['instance_name']
network = row['value']
hash_key = hash(str(project_number) + ',' + instance_name)
if hash_key in cloudsql_acls_map:
if network not in cloudsql_acls_map[hash_key]:
cloudsql_acls_map[hash_key].append(network)
else:
acl_list.append(network)
cloudsql_acls_map[hash_key] = acl_list
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
LOGGER.error(errors.MySQLError(resource_name, e))
return cloudsql_acls_map
def load_data(self, resource_name, timestamp, data):
"""Load data into a snapshot table.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
data (iterable): An iterable or a list of data to be uploaded.
Raises:
MySQLError: When an error has occured while executing the query.
"""
with csv_writer.write_csv(resource_name, data) as csv_file:
try:
snapshot_table_name = self._create_snapshot_table_name(
resource_name, timestamp)
load_data_sql = load_data_sql_provider.provide_load_data_sql(
resource_name, csv_file.name, snapshot_table_name)
LOGGER.debug('SQL: %s', load_data_sql)
cursor = self.conn.cursor()
cursor.execute(load_data_sql)
self.conn.commit()
# TODO: Return the snapshot table name so that it can be tracked
# in the main snapshot table.
except (DataError, IntegrityError, InternalError,
NotSupportedError, OperationalError,
ProgrammingError) as e:
raise MySQLError(resource_name, e)
def get_latest_snapshot_timestamp(self, statuses):
"""Select the latest timestamp of the completed snapshot.
Args:
statuses (tuple): The tuple of snapshot statuses to filter on.
Returns:
str: The string timestamp of the latest complete snapshot.
Raises:
MySQLError: When no rows are found.
"""
# Build a dynamic parameterized query string for filtering the
# snapshot statuses
if not isinstance(statuses, tuple):
statuses = ('SUCCESS',)
status_params = ','.join(['%s']*len(statuses))
filter_clause = SNAPSHOT_STATUS_FILTER_CLAUSE.format(status_params)
try:
cursor = self.conn.cursor()
cursor.execute(
select_data.LATEST_SNAPSHOT_TIMESTAMP + filter_clause, statuses)
row = cursor.fetchone()
if row:
return row[0]
raise NoResultsError('No snapshot cycle found.')
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError, NoResultsError) as e:
raise MySQLError('snapshot_cycles', e)
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def _rollback(self):
try:
BaseDatabaseWrapper._rollback(self)
except Database.NotSupportedError:
pass
def get_cloudsql_acls(self, resource_name, timestamp):
"""Select the cloudsql acls for project from a snapshot table.
Args:
resource_name (str): String of the resource name.
timestamp (str): String of timestamp, formatted as
YYYYMMDDTHHMMSSZ.
Returns:
list: List of cloudsql acls.
Raises:
MySQLError: An error with MySQL has occurred.
"""
cloudsql_acls = {}
cnt = 0
try:
cloudsql_instances_sql = (
select_data.CLOUDSQL_INSTANCES.format(timestamp))
rows = self.execute_sql_with_fetch(resource_name,
cloudsql_instances_sql,
None)
acl_map = self._get_cloudsql_instance_acl_map(resource_name,
timestamp)
for row in rows:
project_number = row['project_number']
instance_name = row['name']
ssl_enabled = row['settings_ip_configuration_require_ssl']
authorized_networks = self.\
_get_networks_for_instance(acl_map,
project_number,
instance_name)
cloudsql_acl = csql_acls.\
CloudSqlAccessControl(instance_name=instance_name,
authorized_networks=authorized_networks,
ssl_enabled=ssl_enabled,
project_number=project_number)
cloudsql_acls[cnt] = cloudsql_acl
cnt += 1
except (DataError, IntegrityError, InternalError, NotSupportedError,
OperationalError, ProgrammingError) as e:
LOGGER.error(errors.MySQLError(resource_name, e))
return cloudsql_acls