将二进制COPY表FROM与psycopg2一起使用
我有数千万行要从多维数组文件传输到PostgreSQL数据库。我的工具是Python和psycopg2。批量插入数据的最有效方法是使用copy_from
。但是,我的数据主要是32位浮点数(实数或float4),所以我宁愿不从实数→文本→实数转换。这是一个示例数据库DDL:
CREATE TABLE num_data
(
id serial PRIMARY KEY NOT NULL,
node integer NOT NULL,
ts smallint NOT NULL,
val1 real,
val2 double precision
);
这是我在Python中使用字符串(文本)的地方:
# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]
import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n')
# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2'))
conn.commit()
是否存在可以使用二进制模式运行的等效项?即,将浮点数保留为二进制?这样不仅可以保持浮点精度,而且可以更快。
(注意:要查看与示例相同的精度,请使用SET extra_float_digits='2'
)
-
这是Python 3的COPY FROM的二进制等效文件:
from io import BytesIO from struct import pack import psycopg2 # Two rows of data; "id" is not in the upstream data source # Columns: node, ts, val1, val2 data = [(23253, 342, -15.336734, 2494627.949375), (23256, 348, 43.23524, 2494827.949375)] conn = psycopg2.connect("dbname=mydb user=postgres") curs = conn.cursor() # Determine starting value for sequence curs.execute("SELECT nextval('num_data_id_seq')") id_seq = curs.fetchone()[0] # Make a binary file object for COPY FROM cpy = BytesIO() # 11-byte signature, no flags, no header extension cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) # Columns: id, node, ts, val1, val2 # Zip: (column position, format, size) row_format = list(zip(range(-1, 4), ('i', 'i', 'h', 'f', 'd'), ( 4, 4, 2, 4, 8 ))) for row in data: # Number of columns/fields (always 5) cpy.write(pack('!h', 5)) for col, fmt, size in row_format: value = (id_seq if col == -1 else row[col]) cpy.write(pack('!i' + fmt, size, value)) id_seq += 1 # manually increment sequence outside of database # File trailer cpy.write(pack('!h', -1)) # Copy data to database cpy.seek(0) curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy) # Update sequence on database curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,)) conn.commit()
更新资料
我改写了上面的方法来为COPY编写文件。我在Python中的数据位于NumPy数组中,因此使用它们很有意义。这是一个
data
具有1M行,7列的示例:import psycopg2 import numpy as np from struct import pack from io import BytesIO from datetime import datetime conn = psycopg2.connect("dbname=mydb user=postgres") curs = conn.cursor() # NumPy record array shape = (7, 2000, 500) print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0])) dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] + [('s' + str(x), 'f4') for x in range(shape[0])]) data = np.empty(shape[1]*shape[2], dtype) data['id'] = np.arange(shape[1]*shape[2]) + 1 data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2]) data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1]) data['s0'] = np.random.rand(shape[1]*shape[2]) * 100 prv = 's0' for nxt in data.dtype.names[4:]: data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10 prv = nxt
在我的数据库中,我有两个看起来像的表:
CREATE TABLE num_data_binary ( id integer PRIMARY KEY, node integer NOT NULL, ts smallint NOT NULL, s0 real, s1 real, s2 real, s3 real, s4 real, s5 real, s6 real ) WITH (OIDS=FALSE);
另一个类似的表名为
num_data_text
。以下是一些简单的辅助函数,它们通过使用NumPy记录数组中的信息为COPY(文本和二进制格式)准备数据:
def prepare_text(dat): cpy = BytesIO() for row in dat: cpy.write('\t'.join([repr(x) for x in row]) + '\n') return(cpy) def prepare_binary(dat): pgcopy_dtype = [('num_fields','>i2')] for field, dtype in dat.dtype.descr: pgcopy_dtype += [(field + '_length', '>i4'), (field, dtype.replace('<', '>'))] pgcopy = np.empty(dat.shape, pgcopy_dtype) pgcopy['num_fields'] = len(dat.dtype) for i in range(len(dat.dtype)): field = dat.dtype.names[i] pgcopy[field + '_length'] = dat.dtype[i].alignment pgcopy[field] = dat[field] cpy = BytesIO() cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) cpy.write(pgcopy.tostring()) # all rows cpy.write(pack('!h', -1)) # file trailer return(cpy)
这就是我使用帮助程序函数对两种COPY格式方法进行基准测试的方式:
def time_pgcopy(dat, table, binary): print('Processing copy object for ' + table) tstart = datetime.now() if binary: cpy = prepare_binary(dat) else: # text cpy = prepare_text(dat) tendw = datetime.now() print('Copy object prepared in ' + str(tendw - tstart) + '; ' + str(cpy.tell()) + ' bytes; transfering to database') cpy.seek(0) if binary: curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy) else: # text curs.copy_from(cpy, table) conn.commit() tend = datetime.now() print('Database copy time: ' + str(tend - tendw)) print(' Total time: ' + str(tend - tstart)) return time_pgcopy(data, 'num_data_text', binary=False) time_pgcopy(data, 'num_data_binary', binary=True)
这是最后两个
time_pgcopy
命令的输出:Processing copy object for num_data_text Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database Database copy time: 0:00:37.929166 Total time: 0:01:53.217861 Processing copy object for num_data_binary Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database Database copy time: 0:00:23.325952 Total time: 0:00:24.622095
因此,使用二进制方法,NumPy→文件和File→数据库步骤都更快。明显的区别是Python如何准备COPY文件,这对于文本来说确实很慢。通常,二进制格式会以这种格式的文本格式在2/3的时间内将其加载到数据库中。
最后,我比较了数据库中两个表中的值,以查看数字是否不同。大约1.46%的行的column值不同
s0
,并且该比例的值增加到6.17%s6
(可能与我使用的随机方法有关)。所有70M
32位浮点值之间的非零绝对差值介于9.3132257e-010和7.6293945e-006之间。文本和二进制加载方法之间的这些细微差别是由于文本格式方法所需的float→text→float转换而导致精度损失。