def sync_table(table, fields):
f1 = ', '.join(fields)
pieces = {
'ataobao2.item': 100,
'ataobao2.item_by_date': 1000,
'ataobao2.brand_by_date': 10,
'ataobao2.shop_by_date': 10,
}.get(table, 1)
start = -2**63
step = 2**64/pieces
print 'migrating {} {}'.format(table, f1)
for i in range(pieces):
start = -2**63 + step*i
end = min(2**63-1, -2**63+step*(i+1))
with db1.connection() as cur:
print 'piece', i+1
#print 'select {} from {} where token({})>=:v1 and token({})<:v2'.format(f1, table, fields[0], fields[0]), dict(v1=start, v2=end)
if table.endswith('_by_date') and 'datestr' in fields:
d0 = (datetime.utcnow() + timedelta(hours=8) - timedelta(days=2)).strftime('%Y-%m-%d')
cur.execute('select {} from {} where token({})>=:v1 and token({})<:v2 and datestr>=:d0 allow filtering'.format(f1, table, fields[0], fields[0]),
dict(v1=start, v2=end, d0=d0), consistency_level='ONE')
else:
cur.execute('select {} from {} where token({})>=:v1 and token({})<:v2'.format(f1, table, fields[0], fields[0]),
dict(v1=start, v2=end), consistency_level='ONE')
for j, row in enumerate(cur):
if j % 1000 == 0:
print 'syncd {}'.format(j)
params = {}
fs = list(fields)
for k,v in zip(fields, row):
if k == 'date':
if v is not None and len(v)==8:
v = struct.unpack('!q', v)[0]
else:
continue
if v is not None:
params[k] = v
fs = params.keys()
fs1 = ', '.join(fs)
fs2 = ', '.join([':'+f for f in fs])
if 'id' in params or 'datestr' in params or 'name' in params:
if table == 'ataobao2.item_by_date' and 'date' not in params:
continue
#print 'INSERT INTO {} ({}) VALUES ({})'.format(table, fs1, fs2), params
pool.spawn(db2.execute, 'insert into {} ({}) values ({})'.format(table, fs1, fs2), params)
评论列表
文章目录