def _to_ssize_tuple(self, addr):
from ctypes import cast, POINTER, c_ssize_t
if addr is None:
return None
return tuple(cast(addr, POINTER(c_ssize_t))[0:self._view.ndim])
python类c_ssize_t()的实例源码
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def repr_offset():
"How many bytes after id(type) can we find the tp_repr pointer?"
return 11 * ctypes.sizeof(ctypes.c_ssize_t)
def str_offset():
"How many bytes after id(type) can we find the tp_str pointer?"
# str is 6 places after repr in the type specification
return repr_offset() + 6 * ctypes.sizeof(ctypes.c_ssize_t)
def assure_generic_c_level_function(tt, offset):
"""
Makes sure that the given type tt uses the generic c-level function
for some of the magic methods such as repr(), str(), etc.
"""
ref_from_address = ctypes.c_ssize_t.from_address
tp_func_object = ref_from_address(id(Object) + offset)
tp_func_cursed = ref_from_address(id(tt) + offset)
tp_func_cursed.value = tp_func_object.value
def makeDataObject(num):
"""
Creates the ctypes data object used by both processes - defines the layout
of the shared memory.
"""
class DataBlock(ctypes.Structure):
_fields_ = [("y", ctypes.c_float*(num*3)),
("y10", ctypes.c_float*(num//10*3)),
("y100", ctypes.c_float*(num//100*3)),
("endpoint", ctypes.c_ssize_t),
("startpoint", ctypes.c_ssize_t),
("pltcurrent", ctypes.c_ssize_t)]
return num
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def libvlc_media_discoverer_list_get(p_inst, i_cat, ppp_services):
'''Get media discoverer services by category.
@param p_inst: libvlc instance.
@param i_cat: category of services to fetch.
@param ppp_services: address to store an allocated array of media discoverer services (must be freed with L{libvlc_media_discoverer_list_release}() by the caller) [OUT].
@return: the number of media discoverer services or -1 on error.
@version: LibVLC 3.0.0 and later.
'''
f = _Cfunctions.get('libvlc_media_discoverer_list_get', None) or \
_Cfunction('libvlc_media_discoverer_list_get', ((1,), (1,), (1,),), None,
ctypes.c_ssize_t, Instance, MediaDiscovererCategory, ctypes.POINTER(ctypes.POINTER(MediaDiscovererDescription)))
return f(p_inst, i_cat, ppp_services)
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def _NumOfRows(self):
"""Get the number of rows"""
if not self.connection:
self.close()
NOR = c_ssize_t()
ret = SQLRowCount(self.stmt_h, ADDR(NOR))
if ret != SQL_SUCCESS:
check_success(self, ret)
self.rowcount = NOR.value
return self.rowcount
def __init__(self,
shape,
format=None,
strides=None,
readonly=None,
itemsize=None):
if format is None:
format = 'B'
if readonly is None:
readonly = False
prefix = ''
typecode = ''
i = 0
if i < len(format):
try:
prefix = self.prefixes[format[i]]
i += 1
except LookupError:
pass
if i < len(format) and format[i] == '1':
i += 1
if i == len(format) - 1:
typecode = format[i]
if itemsize is None:
try:
itemsize = ctypes.sizeof(self.types[prefix + typecode])
except KeyError:
raise ValueError("Unknown item format '" + format + "'")
self.readonly = bool(readonly)
self.format = format
self._format = ctypes.create_string_buffer(format.encode('latin_1'))
self.ndim = len(shape)
self.itemsize = itemsize
self.len = reduce(operator.mul, shape, 1) * self.itemsize
self.shape = tuple(shape)
self._shape = (ctypes.c_ssize_t * self.ndim)(*self.shape)
if strides is None:
self._strides = (ctypes.c_ssize_t * self.ndim)()
self._strides[self.ndim - 1] = itemsize
for i in range(self.ndim - 1, 0, -1):
self._strides[i - 1] = self.shape[i] * self._strides[i]
self.strides = tuple(self._strides)
elif len(strides) == self.ndim:
self.strides = tuple(strides)
self._strides = (ctypes.c_ssize_t * self.ndim)(*self.strides)
else:
raise ValueError("Mismatch in length of strides and shape")
buflen = max(d * abs(s) for d, s in zip(self.shape, self.strides))
self.buflen = buflen
self._buf = (ctypes.c_ubyte * buflen)()
offset = sum((d - 1) * abs(s)
for d, s in zip(self.shape, self.strides) if s < 0)
self.buf = ctypes.addressof(self._buf) + offset
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()
def _CreateColBuf(self):
if not self.connection:
self.close()
self._free_stmt(SQL_UNBIND)
NOC = self._NumOfCols()
self._ColBufferList = []
bind_data = True
for col_num in range(NOC):
col_name = self.description[col_num][0]
col_size = self.description[col_num][2]
col_sql_data_type = self._ColTypeCodeList[col_num]
target_type = SQL_data_type_dict[col_sql_data_type][2]
dynamic_length = SQL_data_type_dict[col_sql_data_type][5]
# set default size base on the column's sql data type
total_buf_len = SQL_data_type_dict[col_sql_data_type][4]
# over-write if there's pre-set size value for "large columns"
if total_buf_len > 20500:
total_buf_len = self._outputsize.get(None,total_buf_len)
# over-write if there's pre-set size value for the "col_num" column
total_buf_len = self._outputsize.get(col_num, total_buf_len)
# if the size of the buffer is very long, do not bind
# because a large buffer decrease performance, and sometimes you only get a NULL value.
# in that case use sqlgetdata instead.
if col_size >= 1024:
dynamic_length = True
alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)
used_buf_len = c_ssize_t()
force_unicode = self.connection.unicode_results
if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
target_type = SQL_C_WCHAR
alloc_buffer = create_buffer_u(total_buf_len)
buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]
if bind_data:
if dynamic_length:
bind_data = False
self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])
if bind_data:
ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
if ret != SQL_SUCCESS:
check_success(self, ret)
def _UpdateDesc(self):
"Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)"
if not self.connection:
self.close()
force_unicode = self.connection.unicode_results
if force_unicode:
Cname = create_buffer_u(1024)
else:
Cname = create_buffer(1024)
Cname_ptr = c_short()
Ctype_code = c_short()
Csize = ctypes.c_size_t()
Cdisp_size = c_ssize_t(0)
CDecimalDigits = c_short()
Cnull_ok = c_short()
ColDescr = []
self._ColTypeCodeList = []
NOC = self._NumOfCols()
for col in range(1, NOC+1):
ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)),
10, ADDR(c_short()),ADDR(Cdisp_size))
if ret != SQL_SUCCESS:
check_success(self, ret)
if force_unicode:
ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
else:
ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\
ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok))
if ret != SQL_SUCCESS:
check_success(self, ret)
col_name = Cname.value
if lowercase:
col_name = col_name.lower()
#(name, type_code, display_size,
ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\
Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False))
self._ColTypeCodeList.append(Ctype_code.value)
if len(ColDescr) > 0:
self.description = ColDescr
# Create the row type before fetching.
self._row_type = self.row_type_callable(self)
else:
self.description = None
self._CreateColBuf()