def __init__(self,username,password,database):
""" Login to database and open cursor """
connect_string = username+'/'+password+'@'+database
try:
self.con = cx_Oracle.connect(connect_string)
except cx_Oracle.DatabaseError as e:
print("Error logging in: "+str(e.args[0]))
print("Username: "+username)
print("Password: "+password)
print("Database: "+database)
sys.exit(-1)
self.cur = self.con.cursor()
self.column_names=[]
python类connect()的实例源码
def get_new_connection(self, conn_params):
return Database.connect(self._connect_string(), **conn_params)
def get_new_connection(self, conn_params):
conn_string = convert_unicode(self._connect_string())
return Database.connect(conn_string, **conn_params)
def get_new_connection(self, conn_params):
conn_string = convert_unicode(self._connect_string())
return Database.connect(conn_string, **conn_params)
def get_new_connection(self, conn_params):
return Database.connect(self._connect_string(), **conn_params)
def getdbname(cursor):
'''db = cx_Oracle.connect(username+'/'+password+'@'+ ipaddress+':'+'/'+tnsname)
cursor = db.cursor()'''
cursor.execute('select name from v$database')
row=cursor.fetchone()
dbname=row[0]
return dbname
def metricCollector(self):
c=None
conn=None
try:
import cx_Oracle
except Exception as e:
self.data['status'] = 0
self.data['msg'] = str(e)
return self.data
try:
dsnStr = cx_Oracle.makedsn(self.host, self.port, self.sid)
conn = cx_Oracle.connect(user=self.username, password=self.password, dsn=dsnStr, mode=cx_Oracle.SYSDBA)
c = conn.cursor()
c.execute('select dff.percentused,dff.tablespace_name, dtf.status from sys.dba_tablespaces dtf , (select tablespace_name,percentused from (select round((d.sizeMb-round(sum(f.bytes))/1048576)/d.maxMb*100) percentused, f.tablespace_name from dba_free_space f, (select tablespace_name, sum(MAXBYTES)/1048576 maxMb, sum(bytes)/1048576 sizeMb from dba_data_files group by tablespace_name) d where f.tablespace_name (+)=d.tablespace_name group by f.tablespace_name, d.sizeMb, d.maxMb order by percentused desc)) dff where dff.tablespace_name (+)=dtf.tablespace_name order by dff.percentused')
for row in c:
usage, name ,status= row
if name in TABLESPACE_NAME:
self.data[name+'_usage'] = usage
self.data[name+'_status'] = status
except Exception as e:
self.data['status'] = 0
self.data['msg'] = str(e)
finally:
if c!= None : c.close()
if conn != None : conn.close()
return self.data
def get_order_ids_from_db(self):
import cx_Oracle
con = cx_Oracle.connect(Config().get_oracle_connect())
cur = con.cursor()
cur.execute("select order_id from m_order t where t.userid='{0}'".format(self.userid))
order_ids = []
for i in cur.fetchall():
order_ids.append(i[0])
return order_ids
def get_not_pay_order_from_db(self):
import cx_Oracle
con = cx_Oracle.connect(Config().get_oracle_connect())
cur = con.cursor()
cur.execute("select order_id from m_order t where t.userid='{0}' and t.status='0'".format(self.userid))
return cur.fetchone()[0]
def get_not_prepare_order_from_db(self):
import cx_Oracle
con = cx_Oracle.connect(Config().get_oracle_connect())
cur = con.cursor()
cur.execute("select order_id from m_order t where t.userid='{0}' and t.status='2'".format(self.userid))
return cur.fetchone()[0]
def get_not_send_order_from_db(self):
import cx_Oracle
con = cx_Oracle.connect(Config().get_oracle_connect())
cur = con.cursor()
cur.execute("select order_id from m_order t where t.userid='{0}' and t.status='3'".format(self.userid))
return cur.fetchone()[0]
def get_not_receive_order_from_db(self):
import cx_Oracle
con = cx_Oracle.connect(Config().get_oracle_connect())
cur = con.cursor()
cur.execute("select order_id from m_order t where t.userid='{0}' and t.status='4'".format(self.userid))
return cur.fetchone()[0]