def execute(self, quals, columns, sortkeys=None):
if ISDEBUG:
logger.log("building select statement... Quals: {0}, columns: {1}, sortkeys: {2}, allow filtering: {3}".format(quals, columns, sortkeys, self.allow_filtering))
res = self.build_select_stmt(quals, columns, self.allow_filtering)
if res is None:
yield {}
return
stmt = res[0]
binding_values = res[1]
filtered_columns = res[2]
if self.prepare_select_stmt:
if stmt not in self.prepared_select_stmts:
if ISDEBUG:
logger.log(u"preparing statement...")
self.prepared_select_stmts[stmt] = self.session.prepare(stmt)
elif ISDEBUG:
logger.log(u"statement already prepared")
if ISDEBUG:
logger.log(u"executing statement...")
st = time.time()
elif self.enable_trace:
logger.log(u"executing statement '{0}'".format(stmt))
if self.prepare_select_stmt:
result = self.session.execute(self.prepared_select_stmts[stmt], binding_values)
else:
result = self.session.execute(SimpleStatement(stmt), binding_values)
if ISDEBUG:
logger.log(u"cursor got in {0} ms".format((time.time() - st) * 1000))
for row in result:
line = {}
idx = 0
for column_name in filtered_columns:
value = row[idx]
if self.columnsTypes[column_name].main_type == cassandra_types.cql_timestamp and value is not None:
line[column_name] = u"{0}+00:00".format(value)
elif self.columnsTypes[column_name].main_type == cassandra_types.cql_time and value is not None:
line[column_name] = u"{0}+00:00".format(value)
elif isinstance(value, tuple):
tuple_values = []
for t in value:
tuple_values.append(str(t))
line[column_name] = json.dumps(tuple_values)
elif isinstance(value, OrderedMapSerializedKey):
dict_values = {}
for i in value:
dict_values[str(i)] = str(value[i])
line[column_name] = json.dumps(dict_values)
else:
line[column_name] = value
idx = idx + 1
rowid_values = []
for idcolumn in self.rowIdColumns:
rowid_values.append(unicode(line[idcolumn]))
line[self.ROWIDCOLUMN] = json.dumps(rowid_values)
yield line
评论列表
文章目录