def connect(self):
""" Connect to the database and set some parameters.
"""
if Database.database is None:
Database.database = QtSql.QSqlDatabase("QPSQL")
Database.database.setHostName(os.environ.get(
"DATABASE_HOST",
settings.DB_HOST
))
Database.database.setPort(int(os.environ.get(
"DATABASE_PORT",
settings.DB_PORT,
)))
Database.database.setUserName(os.environ.get(
"DATABASE_USER",
settings.USERNAME
))
Database.database.setPassword(os.environ.get(
"DATABASE_PASSWORD",
settings.PASSWORD
))
Database.database.setDatabaseName(os.environ.get(
"DATABASE_NAME",
settings.DBNAME
))
if not Database.database.open():
if rapi.utils.check_x11():
# We need this to create an app before opening a window.
import gui.utils
self.tmp = QtWidgets.QApplication(sys.argv)
gui.utils.error("Error", "Can't join database")
print("Can't join database")
sys.exit(1)
python类DB_HOST的实例源码
def migrate_tables():
import psycopg2
print('This will migrate database to latest schema, you are advised to backup database before running this command')
if prompt_bool('Do you want to continue?'):
mdir = os.path.join(SQL_DIR, 'migration')
version_obj=model.Version.query.one_or_none()
if not version_obj:
version_obj=model.Version(version=0, version_id=1)
db.session.add(version_obj)
old_version = version_obj.version
if old_version == db_version:
print('DB is at correct version %d'% old_version)
scripts = []
for script in os.listdir(mdir):
m=re.match(r'v(\d+)\.sql', script)
if m:
version = int(m.group(1))
if version <= db_version and version > old_version:
scripts.append((version, os.path.join(mdir,script)))
scripts.sort()
connection = psycopg2.connect(database=settings.DB_NAME,
user = settings.DB_USER,
password = settings.DB_PASSWORD,
host = settings.DB_HOST,
port = settings.DB_PORT)
connection.autocommit = True
#connection = db.engine.raw_connection() # @UndefinedVariable
try:
c = connection.cursor()
for v,fname in scripts:
script = open(fname, 'rt', encoding='utf-8-sig').read()
print('Upgrading database to version %d'% v)
res = c.execute(script)
version_obj.version = v
db.session.commit()
#connection.commit()
finally:
connection.close()
def _datebase_cursor(self):
db = MySQLdb.connect(
host=DB_HOST,
user=DB_USER,
passwd=DB_PASS
)
cur = db.cursor()
return db, cur
def __init__(self):
self.dbpool = adbapi.ConnectionPool(
dbapiName='MySQLdb',
host=settings.DB_HOST,
db=settings.DB,
user=settings.DB_NAME,
passwd=settings.DB_PASSWD,
cursorclass= MySQLdb.cursors.DictCursor,
charset = 'utf8',
use_unicode = False
)
def __init__(self):
self.dbpool = adbapi.ConnectionPool(
dbapiName='MySQLdb',
host=settings.DB_HOST,
db=settings.DB,
user=settings.DB_NAME,
passwd=settings.DB_PASSWD,
cursorclass= MySQLdb.cursors.DictCursor,
charset = 'utf8',
use_unicode = False
)
def mysql_process(cnx):
"""
Entry point for our application
"""
logger.info("Setting up connection to {}@{}".format(settings.DB_DATABASE, settings.DB_HOST))
dry_run = settings.GLOBAL_DRY_RUN
# Your CODE here
all_ops = []
for table_name in utils.get_all_tables(cnx):
try:
operations = utils.get_operation_for_table(cnx, table_name)
for operation in operations:
all_ops.append(operation)
except OperationError, e:
logger.error("A Table lacks operations: \033[91m{}\033[00m".format(e.msg))
# Sort and execute
logger.info("Executing Operations")
cnx.execute("SET FOREIGN_KEY_CHECKS=0", dry_run=dry_run)
all_ops.sort(key=operator.attrgetter('priority'), reverse=True)
for op in all_ops:
logger.info("{} => result: {}".format(op, op()))
# logger.info("{} => result: {}".format(op, op))
if not dry_run:
cnx._connection.commit()
cnx.execute("SET FOREIGN_KEY_CHECKS=1", dry_run=dry_run)
logger.info("Closing connection to DB")
cnx.close()