def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
评论列表
文章目录