def __call__(self, query, env=None):
"""
Execute the SQL query.
Automatically creates tables mentioned in the query from dataframes before executing.
:param query: SQL query string, which can reference pandas dataframes as SQL tables.
:param env: Variables environment - a dict mapping table names to pandas dataframes.
If not specified use local and global variables of the caller.
:return: Pandas dataframe with the result of the SQL query.
"""
if env is None:
env = get_outer_frame_variables()
with self.conn as conn:
for table_name in extract_table_names(query):
if table_name not in env:
# don't raise error because the table may be already in the database
continue
if self.persist and table_name in self.loaded_tables:
# table was loaded before using the same instance, don't do it again
continue
self.loaded_tables.add(table_name)
write_table(env[table_name], table_name, conn)
try:
result = read_sql(query, conn)
except DatabaseError as ex:
raise PandaSQLException(ex)
except ResourceClosedError:
# query returns nothing
result = None
return result
python类DatabaseError()的实例源码
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s" % dbname)
return True
except exc.DatabaseError as err:
log.warn("couldn't drop db: %s" % err)
return False
def commit_session(response):
"""
Try to commit the db session in the case
of a successful request with status_code
under 400.
"""
if response.status_code >= 400:
return response
try:
db_session.commit()
except DatabaseError:
db_session.rollback()
return response
def _flush(self):
try:
db_session.flush()
except DatabaseError:
db_session.rollback()
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s" % dbname)
return True
except exc.DatabaseError as err:
log.warn("couldn't drop db: %s" % err)
return False
def cockroach_transaction(f):
def run_transaction(caller):
while True:
with MONITOR_COCKROACHDB.observe_transaction(caller):
try:
return f()
except DatabaseError as e:
if not isinstance(e.orig, psycopg2.OperationalError) and \
not e.orig.pgcode == psycopg2.errorcodes.SERIALIZATION_FAILURE:
raise
MONITOR_COCKROACHDB.cockroach_retry_count.labels(caller).inc()
return run_transaction
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s" % dbname)
return True
except exc.DatabaseError as err:
log.warn("couldn't drop db: %s" % err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s" % dbname)
return True
except exc.DatabaseError as err:
log.warn("couldn't drop db: %s" % err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def drop(self):
"""
Remove version control from a database.
"""
if SQLA_07:
try:
self.table.drop()
except sa_exceptions.DatabaseError:
raise exceptions.DatabaseNotControlledError(str(self.table))
else:
try:
self.table.drop()
except (sa_exceptions.SQLError):
raise exceptions.DatabaseNotControlledError(str(self.table))
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s", dbname)
return True
except exc.DatabaseError as err:
log.warning("couldn't drop db: %s", err)
return False
def _ora_drop_ignore(conn, dbname):
try:
conn.execute("drop user %s cascade" % dbname)
log.info("Reaped db: %s" % dbname)
return True
except exc.DatabaseError as err:
log.warn("couldn't drop db: %s" % err)
return False
def __fill_player(data, api, account_id):
LOGGER.info('Find account id: %d', account_id)
# Important: Cannot utilize steamids with account_ids, because the orders of returned players was not in the same sequence and no account id in the response.
try:
players = api.get_player_summaries(steamids=account_id)
except (APIError, APITimeoutError) as error:
LOGGER.error('Failed to retrieved account id: %d and for now it will be recorded with minimum info, error: %s',
account_id, str(error))
# Temporary creates a blank account with consideration that this account will be synch up again in the next fill_database_detail() invocation.
players = {
'players': [
{
'steamid': account_id,
'profileurl': 'N/A'
}
]
}
if players is None:
LOGGER.info('Not found account id: %d', account_id)
return False
for player in players['players']:
steam_id = player['steamid']
real_name = player.get('realname', None)
persona_name = player.get('personaname', None)
avatar = player.get('avatarfull', None)
profile_url = player.get('profileurl', None)
data_player = data.get_player(account_id=account_id)
try:
if data_player:
data_player.steam_id = steam_id
data_player.real_name = real_name
data_player.persona_name = persona_name
data_player.avatar = avatar
data_player.profile_url = profile_url
data.update_player(player=data_player)
LOGGER.info('Updated account id: %d', account_id)
else:
data.add_player(account_id=account_id,
steam_id=steam_id,
profile_url=profile_url,
real_name=real_name,
persona_name=persona_name,
avatar=avatar)
LOGGER.info('Created account id: %d', account_id)
return True
except DatabaseError as error: # Temporary ignore the unsupported data, especially the unicode issue.
LOGGER.error('Failed to process account id: %d, error: %s', account_id, str(error))
Database.session.rollback()
return False
def __bulk_add_replicas(rse_id, files, account, session=None):
"""
Bulk add new dids.
:param rse_id: the RSE id.
:param dids: the list of files.
:param account: The account owner.
:param session: The database session in use.
:returns: True is successful.
"""
nbfiles, bytes = 0, 0
# Check for the replicas already available
condition = or_()
for f in files:
condition.append(and_(models.RSEFileAssociation.scope == f['scope'], models.RSEFileAssociation.name == f['name'], models.RSEFileAssociation.rse_id == rse_id))
query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id).\
with_hint(models.RSEFileAssociation, text="INDEX(REPLICAS REPLICAS_PK)", dialect_name='oracle').\
filter(condition)
available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query]
new_replicas = []
for file in files:
found = False
for available_replica in available_replicas:
if file['scope'] == available_replica['scope'] and file['name'] == available_replica['name'] and rse_id == available_replica['rse_id']:
found = True
break
if not found:
nbfiles += 1
bytes += file['bytes']
new_replicas.append({'rse_id': rse_id, 'scope': file['scope'],
'name': file['name'], 'bytes': file['bytes'],
'path': file.get('path'),
'state': ReplicaState.from_string(file.get('state', 'A')),
'md5': file.get('md5'), 'adler32': file.get('adler32'),
'lock_cnt': file.get('lock_cnt', 0),
'tombstone': file.get('tombstone')})
# new_replica = models.RSEFileAssociation(rse_id=rse_id, scope=file['scope'], name=file['name'], bytes=file['bytes'],
# path=file.get('path'), state=ReplicaState.from_string(file.get('state', 'A')),
# md5=file.get('md5'), adler32=file.get('adler32'), lock_cnt=file.get('lock_cnt', 0),
# tombstone=file.get('tombstone'))
# new_replica.save(session=session, flush=False)
try:
new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation,
new_replicas)
session.flush()
return nbfiles, bytes
except IntegrityError, error:
if match('.*IntegrityError.*ORA-00001: unique constraint .*REPLICAS_PK.*violated.*', error.args[0]) \
or match('.*IntegrityError.*1062.*Duplicate entry.*', error.args[0]) \
or error.args[0] == '(IntegrityError) columns rse_id, scope, name are not unique' \
or match('.*IntegrityError.*duplicate key value violates unique constraint.*', error.args[0]):
raise exception.Duplicate("File replica already exists!")
raise exception.RucioException(error.args)
except DatabaseError, error:
raise exception.RucioException(error.args)