def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (Database.DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
python类DatabaseError()的实例源码
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def test_cleanup_on_badconn_close(self):
# ticket #148
conn = self.conn
cur = conn.cursor()
try:
cur.execute("select pg_terminate_backend(pg_backend_pid())")
except psycopg2.OperationalError as e:
if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
raise
except psycopg2.DatabaseError as e:
# curiously when disconnected in green mode we get a DatabaseError
# without pgcode.
if e.pgcode is not None:
raise
self.assertEqual(conn.closed, 2)
conn.close()
self.assertEqual(conn.closed, 1)
def setUp(self):
ConnectingTestCase.setUp(self)
curs = self.conn.cursor()
# Drop table if it already exists
try:
curs.execute("DROP TABLE table1")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
try:
curs.execute("DROP TABLE table2")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
# Create sample data
curs.execute("""
CREATE TABLE table1 (
id int PRIMARY KEY,
name text)
""")
curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
self.conn.commit()
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (Database.DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def setUp(self):
ConnectingTestCase.setUp(self)
curs = self.conn.cursor()
# Drop table if it already exists
try:
curs.execute("DROP TABLE table1")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
try:
curs.execute("DROP TABLE table2")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
# Create sample data
curs.execute("""
CREATE TABLE table1 (
id int PRIMARY KEY,
name text)
""")
curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
self.conn.commit()
def _nodb_connection(self):
nodb_connection = super(DatabaseWrapper, self)._nodb_connection
try:
nodb_connection.ensure_connection()
except (DatabaseError, WrappedDatabaseError):
warnings.warn(
"Normally Django will use a connection to the 'postgres' database "
"to avoid running initialization queries against the production "
"database when it's not needed (for example, when running tests). "
"Django was unable to create a connection to the 'postgres' database "
"and will use the default database instead.",
RuntimeWarning
)
settings_dict = self.settings_dict.copy()
settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
nodb_connection = self.__class__(
self.settings_dict.copy(),
alias=self.alias,
allow_thread_sharing=False)
return nodb_connection
def num_connections(self):
"""Returns the number of existing connections to the database. If
there are >1 connections, a new Ravel base implementation cannot be
loaded into the database.
returns: the number of existing connections to the database"""
try:
self.cursor.execute("SELECT * FROM pg_stat_activity WHERE "
"datname='{0}'".format(self.name))
# ignore cursor connection
return len(self.cursor.fetchall()) - 1
except psycopg2.DatabaseError, e:
logger.warning("error loading schema: %s", self.fmt_errmsg(e))
return 0
def load_schema(self, script):
"""Load the specified schema into the database"
script: path to a SQL script"""
try:
s = open(script, "r").read()
logger.debug("loaded schema %s", script)
self.cursor.execute(s)
except psycopg2.DatabaseError, e:
logger.warning("error loading schema: %s", self.fmt_errmsg(e))
def add_extensions(self):
"""If not already added, add extensions required by Ravel (plpythonu,
postgis, pgrouting)"""
try:
self.cursor.execute("SELECT 1 FROM pg_catalog.pg_namespace n JOIN " +
"pg_catalog.pg_proc p ON pronamespace = n.oid " +
"WHERE proname = 'pgr_dijkstra';")
fetch = self.cursor.fetchall()
if fetch == []:
self.cursor.execute("CREATE EXTENSION IF NOT EXISTS plpythonu;")
self.cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
self.cursor.execute("CREATE EXTENSION IF NOT EXISTS pgrouting;")
self.cursor.execute("CREATE EXTENSION plsh;")
logger.debug("created extensions")
except psycopg2.DatabaseError, e:
logger.warning("error loading extensions: %s", self.fmt_errmsg(e))
def truncate(self):
"""Clean the database of any state Ravel components, except for
topology tables. This rolls back the database to the state after
the topology is first loaded"""
try:
tables = ["cf", "clock", "p_spv", "spatial_ref_sys", "spv_tb_del",
"spv_tb_ins", "rm", "rm_delta", "urm"]
self.cursor.execute("truncate %s;" % ", ".join(tables))
logger.debug("truncated tables")
self.cursor.execute("INSERT INTO clock values (0);")
except psycopg2.DatabaseError, e:
logger.warning("error truncating databases: %s", self.fmt_errmsg(e))
def test__send_keep_alive(slot):
with patch('threading.Timer') as mock_timer:
# No matter what we schedule keep alive thread. Sometimes we log an error
db_error = psycopg2.DatabaseError()
db_error.message = 'Log ME!'
assert_stuff_about_keep_alive(slot, mock_timer, db_error, True)
db_error = psycopg2.DatabaseError()
db_error.message = 'no COPY in progress\n'
assert_stuff_about_keep_alive(slot, mock_timer, db_error, False)
error = Exception()
error.message = 'no COPY in progress\n'
assert_stuff_about_keep_alive(slot, mock_timer, error, True)
# Happy Path
assert_stuff_about_keep_alive(slot, mock_timer, None, False)
def test_cleanup_on_badconn_close(self):
# ticket #148
conn = self.conn
cur = conn.cursor()
try:
cur.execute("select pg_terminate_backend(pg_backend_pid())")
except psycopg2.OperationalError as e:
if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
raise
except psycopg2.DatabaseError as e:
# curiously when disconnected in green mode we get a DatabaseError
# without pgcode.
if e.pgcode is not None:
raise
self.assertEqual(conn.closed, 2)
conn.close()
self.assertEqual(conn.closed, 1)
def open():
try:
conn = psycopg2.connect(
'dbname={db_name} user={user_name} password={password}'.format(db_name=get_db_name(),
user_name=get_user_name(),
password=get_password()))
return conn
except (Exception, psycopg2.DatabaseError) as error:
logger.error("Wasn't able to connect", error)
raise
def get_dataset_output_from_database():
rows = [];
try:
conn = get_connection()
cur = conn.cursor()
cur.execute('SELECT lead_status FROM dataset')
rows = cur.fetchall()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return np.array(rows)
def get_connection():
try:
conn = psycopg2.connect(host=os.getenv('DATABASE_HOST', 'intellead-classification-postgresql'), database=os.getenv('DATABASE_NAME', 'postgres'),
user=os.getenv('DATABASE_USER', 'postgres'), password=os.getenv('DATABASE_PASSWORD', 'postgres'))
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
return conn
def users_total(self):
'''Query number of users'''
try:
con = psycopg2.connect(self.config['RDS_URI'])
cur = con.cursor()
cur.execute("SELECT COUNT(*) FROM users")
con.commit()
result = cur.fetchone()
print("The number of users: %s" %result[0])
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if con is not None:
con.close()
def users_last_day(self):
'''Query number of users for the last day'''
try:
con = psycopg2.connect(self.config['RDS_URI'])
cur = con.cursor()
cur.execute("SELECT COUNT(*) FROM users WHERE join_date > (NOW() - INTERVAL '24 hours')")
con.commit()
result = cur.fetchone()
print("The number of users registered in last day: %s" %result[0])
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if con is not None:
con.close()