def to_sqla_table_idempotent(table, data):
'''
Idempotently load data into an SQLA table, temporarily write out details on
integrity errors to a file
'''
if not isinstance(data, (list, map, filter)):
raise Exception('`data` arg is not a list, map or filter object')
primary_key = etl.utils.primary_key(table)
results = []
missing = collections.defaultdict(set)
for row in data:
upsert = insert(table)\
.values(**row)\
.on_conflict_do_update(index_elements=[primary_key], set_=row)
try:
results.append(table.metadata.bind.execute(upsert))
except sqla_exc.IntegrityError as exc:
parsed = re.search(INTEGRITY_DETAILS, str(exc))
if parsed:
missing[table.name].add(row[primary_key])
missing[parsed.group('table')].add(parsed.group('pkey'))
continue
LOGGER.error(
'%s %s (%s) failed on :',
datetime.datetime.now(), table.name, row[primary_key]
)
LOGGER.error(str(exc).split('\n')[0])
return results, missing
评论列表
文章目录