def set_snapshot(xmin, snapshot_id):
global current_xmin_snapshot_id
if current_xmin_snapshot_id == (xmin, snapshot_id):
return
clear_snapshot()
current_xmin_snapshot_id = (xmin, snapshot_id)
while True:
txn = transaction.begin()
txn.doom()
if snapshot_id is not None:
txn.setExtendedInfo('snapshot_id', snapshot_id)
session = app.registry[DBSESSION]()
connection = session.connection()
db_xmin = connection.execute(
"SELECT txid_snapshot_xmin(txid_current_snapshot());").scalar()
if db_xmin >= xmin:
break
transaction.abort()
log.info('Waiting for xmin %r to reach %r', db_xmin, xmin)
time.sleep(0.1)
registry = app.registry
request = app.request_factory.blank('/_indexing_pool')
request.registry = registry
request.datastore = 'database'
apply_request_extensions(request)
request.invoke_subrequest = app.invoke_subrequest
request.root = app.root_factory(request)
request._stats = {}
manager.push({'request': request, 'registry': registry})
评论列表
文章目录