dump2111.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:data007 作者: mobishift2011 项目源码 文件源码
def sync_table(table, fields):
    f1 = ', '.join(fields)
    pieces = {
        'ataobao2.item': 100,
        'ataobao2.item_by_date': 1000,
        'ataobao2.brand_by_date': 10,
        'ataobao2.shop_by_date': 10,
    }.get(table, 1)
    start = -2**63
    step = 2**64/pieces
    print 'migrating {} {}'.format(table, f1)

    for i in range(pieces):
        start = -2**63 + step*i
        end = min(2**63-1, -2**63+step*(i+1))
        with db1.connection() as cur:
            print 'piece', i+1
            #print 'select {} from {} where token({})>=:v1 and token({})<:v2'.format(f1, table, fields[0], fields[0]), dict(v1=start, v2=end)
            if table.endswith('_by_date') and 'datestr' in fields:
                d0 = (datetime.utcnow() + timedelta(hours=8) - timedelta(days=2)).strftime('%Y-%m-%d')
                cur.execute('select {} from {} where token({})>=:v1 and token({})<:v2 and datestr>=:d0 allow filtering'.format(f1, table, fields[0], fields[0]), 
                        dict(v1=start, v2=end, d0=d0), consistency_level='ONE')
            else:
                cur.execute('select {} from {} where token({})>=:v1 and token({})<:v2'.format(f1, table, fields[0], fields[0]), 
                        dict(v1=start, v2=end), consistency_level='ONE')
            for j, row in enumerate(cur):
                if j % 1000 == 0:
                    print 'syncd {}'.format(j)
                params = {}
                fs = list(fields)
                for k,v in zip(fields, row):
                    if k == 'date':
                        if v is not None and len(v)==8:
                            v = struct.unpack('!q', v)[0]
                        else:
                            continue
                    if v is not None:
                        params[k] = v 
                fs = params.keys()
                fs1 = ', '.join(fs)
                fs2 = ', '.join([':'+f for f in fs])
                if 'id' in params or 'datestr' in params or 'name' in params:
                    if table == 'ataobao2.item_by_date' and 'date' not in params:
                        continue
                    #print 'INSERT INTO {} ({}) VALUES ({})'.format(table, fs1, fs2), params
                    pool.spawn(db2.execute, 'insert into {} ({}) values ({})'.format(table, fs1, fs2), params)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号