def run_in_tx(self, batch, chunk_count=None, dry_run=False):
if not chunk_count:
chunk_count = sys.maxsize
if isinstance(batch, Generator):
it = batch
elif isinstance(batch, Iterable):
def gen():
for j in batch:
yield j
it = gen()
else:
err = "batch_job must be iterable or callable but {0} passed"
err = err.format(type(batch))
logger.error(err)
raise ValueError(err)
if dry_run:
return list(it)
session = self.neo4j_driver.session()
try:
result_set = []
consumed_result = None
more_chunks = True
while more_chunks:
logger.debug('neo4j transaction beginning')
with session.begin_transaction() as tx:
chunk_i = 0
try:
while chunk_i < chunk_count:
# noinspection PyNoneFunctionAssignment
query, params = it.send(consumed_result)
logger.debug('chunk %s will run query %s'
'in transaction', chunk_i, query)
result = tx.run(query, params)
consumed_result = list(result)
result_set.append(consumed_result)
chunk_i += 1
except StopIteration:
more_chunks = False
tx.success = True
logger.debug('neo4j transaction committed')
return result_set
finally:
session.close()
评论列表
文章目录