def _fetchall(self):
if any(x[1]==cx_Oracle.CLOB for x in self.cursor.description):
return [tuple([(c.read() if type(c) == cx_Oracle.LOB else c) \
for c in r]) for r in self.cursor]
else:
return self.cursor.fetchall()
python类LOB的实例源码
def lastrowid(self,table):
sequence_name = table._sequence_name
self.execute('SELECT %s.currval FROM dual;' % sequence_name)
return long(self.cursor.fetchone()[0])
#def parse_value(self, value, field_type, blob_decode=True):
# if blob_decode and isinstance(value, cx_Oracle.LOB):
# try:
# value = value.read()
# except self.driver.ProgrammingError:
# # After a subsequent fetch the LOB value is not valid anymore
# pass
# return BaseAdapter.parse_value(self, value, field_type, blob_decode)
def __cover_clob_to_str(self, datas):
for i in range(len(datas)):
temp_data = []
for data in datas[i]:
if isinstance(data, cx_Oracle.LOB) or isinstance(data, datetime.datetime):
data = str(data)
temp_data.append(data)
datas[i] = temp_data
return datas
def lastrowid(self, table):
sequence_name = table._sequence_name
self.execute('SELECT %s.currval FROM dual;' % sequence_name)
return long(self.cursor.fetchone()[0])
# def parse_value(self, value, field_type, blob_decode=True):
# if blob_decode and isinstance(value, cx_Oracle.LOB):
# try:
# value = value.read()
# except self.driver.ProgrammingError:
# # After a subsequent fetch the LOB value is not valid anymore
# pass
# return BaseAdapter.parse_value(self, value, field_type, blob_decode)
def query(self, _sql):
"""Execute the SQL statement, get a dataset"""
if self.connection is None:
raise Exception("Error: dal.query() is called without a database connection. Query:\n" + _sql)
print("Info: dal.query() at "+ str(self) + "/" + str(self.connection) + " running the following SQL:\n" + _sql)
# py-postgres doesn't use the DB-API, as it doesn't work well-
if self.db_type == DB_POSTGRESQL:
_ps = self.connection.prepare(_sql)
_res = _ps()
if _ps.column_names is not None:
self.field_names = _ps.column_names
self.field_types = []
for _curr_type in _ps.column_types:
self.field_types.append(python_type_to_sql_type(_curr_type))
else:
cur = self.connection.cursor()
cur.execute(_sql)
self.field_names, self.field_types = parse_description(cur.description, self.db_type)
_res = cur.fetchall()
# Untuple. TODO: This might need to be optimised, perhaps by working with the same array.
_results = []
for _row in _res:
_results.append(list(_row))
if self.db_type == DB_ORACLE:
import cx_Oracle
if cx_Oracle.BLOB in self.field_types:
# Loop results and read all blob data
for _curr_row_idx, _curr_row in enumerate(_results):
for _curr_col_idx, _curr_col in enumerate(_curr_row):
if isinstance(_curr_col, cx_Oracle.LOB):
_results[_curr_row_idx][_curr_col_idx] = _curr_col.read()
return _results
def DBGetCursorData(self, sql, id_th):
result = []
tryconn = 0
while tryconn<2:
try:
connection = self.Voracle_connect.acquire()
cursor = connection.cursor()
if id_th >= 0:
self.WinThUpdate(id_th, self.TH_SQL_RUNNING)
cursor.execute(sql)
cursor.arraysize = 1500
if id_th >= 0:
self.WinThUpdate(id_th, self.TH_ROW_FETCHING)
for row in cursor: # fetch rows
newrow=[]
for i in range(0,len(row)):
if type(row[i]) is cx_Oracle.LOB:
newrow.append(row[i].read() ) # read lob locator
else:
newrow.append(row[i])
result.append(list(newrow))
cursor.close()
self.Voracle_connect.release(connection)
break
except Exception as err:
result=[]
if id_th >= 0:
self.WinThUpdate(id_th,self.TH_REQUEST_DB_CONN)
self.lockdbconn.acquire()
if self.DBpingconn(self.Voracle_connect) == 0: # global connection handle status broken
if id_th >= 0:
self.WinThUpdate(id_th,self.TH_DB_RECONNECTING)
oconn = self.DBreconnect()
if oconn < 0: #error on reconnecting
tryconn = 2 # exit try
result=-1
else:
tryconn += 1
else:
tryconn = 2 #exit try
result=-1
self.lockdbconn.release()
return(result)
def _fetchall(self):
if any(x[1] == cx_Oracle.CLOB for x in self.cursor.description):
return [tuple([(c.read() if type(c) == cx_Oracle.LOB else c)
for c in r]) for r in self.cursor]
else:
return self.cursor.fetchall()