pypyodbc.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码
def _CreateColBuf(self):
        if not self.connection:
            self.close()
        self._free_stmt(SQL_UNBIND)
        NOC = self._NumOfCols()
        self._ColBufferList = []
        bind_data = True
        for col_num in range(NOC):
            col_name = self.description[col_num][0]             
            col_size = self.description[col_num][2]     
            col_sql_data_type = self._ColTypeCodeList[col_num]  

            target_type = SQL_data_type_dict[col_sql_data_type][2]
            dynamic_length = SQL_data_type_dict[col_sql_data_type][5] 
            # set default size base on the column's sql data type
            total_buf_len = SQL_data_type_dict[col_sql_data_type][4] 

            # over-write if there's pre-set size value for "large columns"
            if total_buf_len > 20500: 
                total_buf_len = self._outputsize.get(None,total_buf_len)
            # over-write if there's pre-set size value for the "col_num" column 
            total_buf_len = self._outputsize.get(col_num, total_buf_len)

            # if the size of the buffer is very long, do not bind
            # because a large buffer decrease performance, and sometimes you only get a NULL value. 
            # in that case use sqlgetdata instead.
            if col_size >= 1024:
                dynamic_length = True    

            alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len)

            used_buf_len = c_ssize_t()

            force_unicode = self.connection.unicode_results

            if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR):
                target_type = SQL_C_WCHAR
                alloc_buffer = create_buffer_u(total_buf_len)

            buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]]

            if bind_data:
                if dynamic_length:
                    bind_data = False
            self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data])     

            if bind_data:
                ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len))
                if ret != SQL_SUCCESS:
                    check_success(self, ret)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号