def _setup_environment(environ):
# Cygwin requires some special voodoo to set the environment variables
# properly so that Oracle will see them.
if platform.system().upper().startswith('CYGWIN'):
try:
import ctypes
except ImportError as e:
from django.core.exceptions import ImproperlyConfigured
raise ImproperlyConfigured("Error loading ctypes: %s; "
"the Oracle backend requires ctypes to "
"operate correctly under Cygwin." % e)
kernel32 = ctypes.CDLL('kernel32')
for name, value in environ:
kernel32.SetEnvironmentVariableA(name, value)
else:
os.environ.update(environ)
python类Error()的实例源码
def is_usable(self):
try:
self.connection.ping()
except Database.Error:
return False
else:
return True
def Open(self, p_autocommit=True):
try:
self.v_con = sqlite3.connect(self.v_service, self.v_timeout)
#self.v_con.row_factory = sqlite3.Row
self.v_cur = self.v_con.cursor()
if self.v_foreignkeys:
self.v_cur.execute('PRAGMA foreign_keys = ON')
self.v_start = True
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_row = self.v_cur.fetchone()
while v_row is not None:
if p_alltypesstr:
v_rowtmp = list(v_row)
for j in range(0, len(v_table.Columns)):
if v_rowtmp[j] != None:
v_rowtmp[j] = str(v_rowtmp[j])
else:
v_rowtmp[j] = ''
v_row = tuple(v_rowtmp)
v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row)))
v_row = self.v_cur.fetchone()
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
v_fields = []
self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1')
r = self.v_cur.fetchone()
if r != None:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(r[k]), p_dbtype=type(r[k])))
k = k + 1
else:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(None), p_dbtype=type(None)))
k = k + 1
return v_fields
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Open(self, p_autocommit=True):
try:
self.v_con = sqlite3.connect(self.v_service, self.v_timeout)
#self.v_con.row_factory = sqlite3.Row
self.v_cur = self.v_con.cursor()
if self.v_foreignkeys:
self.v_cur.execute('PRAGMA foreign_keys = ON')
self.v_start = True
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_row = self.v_cur.fetchone()
while v_row is not None:
if p_alltypesstr:
v_rowtmp = list(v_row)
for j in range(0, len(v_table.Columns)):
if v_rowtmp[j] != None:
v_rowtmp[j] = str(v_rowtmp[j])
else:
v_rowtmp[j] = ''
v_row = tuple(v_rowtmp)
v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row)))
v_row = self.v_cur.fetchone()
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Cancel(self):
try:
if self.v_con:
self.v_con.cancel()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_table.Rows = self.v_cur.fetchall()
if p_alltypesstr:
for i in range(0, len(v_table.Rows)):
for j in range(0, len(v_table.Columns)):
if v_table.Rows[i][j] != None:
v_table.Rows[i][j] = str(v_table.Rows[i][j])
else:
v_table.Rows[i][j] = ''
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetPID(self):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
return self.v_con.get_backend_pid()
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Terminate(self, p_pid):
try:
self.Execute('select pg_terminate_backend({0})'.format(p_pid))
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetNotices(self):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
return self.v_con.notices.v_list
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def ClearNotices(self):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
del self.v_con.notices.v_list[:]
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def QueryBlock(self, p_sql, p_blocksize, p_alltypesstr=False):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
if self.v_start:
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
if p_blocksize > 0:
v_table.Rows = self.v_cur.fetchmany(p_blocksize)
else:
v_table.Rows = self.v_cur.fetchall()
if p_alltypesstr:
for i in range(0, len(v_table.Rows)):
for j in range(0, len(v_table.Columns)):
if v_table.Rows[i][j] != None:
v_table.Rows[i][j] = str(v_table.Rows[i][j])
else:
v_table.Rows[i][j] = ''
if self.v_start:
self.v_start = False
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Open(self, p_autocommit=True):
try:
self.v_con = pymysql.connect(
host=self.v_host,
port=int(self.v_port),
db=self.v_service,
user=self.v_user,
password=self.v_password,
cursorclass=pymysql.cursors.DictCursor)
self.v_cur = self.v_con.cursor()
self.v_start = True
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_table.Rows = self.v_cur.fetchall()
if p_alltypesstr:
for i in range(0, len(v_table.Rows)):
for j in range(0, len(v_table.Columns)):
if v_table.Rows[i][j] != None:
v_table.Rows[i][j] = str(v_table.Rows[i][j])
else:
v_table.Rows[i][j] = ''
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))