def Query(self, p_sql, p_alltypesstr=False):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_row = self.v_cur.fetchone()
while v_row is not None:
if p_alltypesstr:
v_rowtmp = list(v_row)
for j in range(0, len(v_table.Columns)):
if v_rowtmp[j] != None:
v_rowtmp[j] = str(v_rowtmp[j])
else:
v_rowtmp[j] = ''
v_row = tuple(v_rowtmp)
v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row)))
v_row = self.v_cur.fetchone()
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
评论列表
文章目录