def do_chunks(init, proc, src_col, dest_col, query, key, sort, verbose, sleep=60):
while housekeep.objects(state = 'done').count() < housekeep.objects.count():
tnow = datetime.datetime.utcnow()
raw = housekeep._collection.find_and_modify(
{'state': 'open'},
{
'$set': {
'state': 'working',
'tstart': tnow,
'procname': procname(),
}
}
)
# if raw==None, someone scooped us
if raw != None:
raw_id = raw['_id']
#reload as mongoengine object -- _id is .start (because set as primary_key)
hko = housekeep.objects(start = raw_id)[0]
# Record git commit for sanity
# hko.git = git.Git('.').rev_parse('HEAD')
# hko.save()
# get data pointed to by housekeep
qq = {'$and': [query, {key: {'$gte': hko.start}}, {key: {'$lte': hko.end}}]}
# Make cursor not timeout, using version-appropriate paramater
if pymongo.version_tuple[0] == 2:
cursor = src_col.find(qq, timeout=False)
elif pymongo.version_tuple[0] == 3:
cursor = src_col.find(qq, no_cursor_timeout=True)
else:
raise Exception("Unknown pymongo version")
# Set the sort parameters on the cursor
if sort[0] == "-":
cursor = cursor.sort(sort[1:], pymongo.DESCENDING)
else:
cursor = cursor.sort(sort, pymongo.ASCENDING)
if verbose & 2: print "mongo_process: %d elements in chunk %s-%s" % (cursor.count(), hko.start, hko.end)
sys.stdout.flush()
# This is where processing happens
hko.good =_process(init, proc, cursor, dest_col, verbose,
hkstart=raw_id)
# Check if another job finished it while this one was plugging away
hko_later = housekeep.objects(start = raw_id).only('state')[0]
if hko.good == -1: # Early exit signal
print "Chunk at %s lost to another process; not updating" % raw_id
sys.stdout.flush()
elif hko_later.state == 'done':
print "Chunk at %s had already finished; not updating" % raw_id
sys.stdout.flush()
else:
hko.state = 'done'
hko.procname = 'none'
hko.time = datetime.datetime.utcnow()
hko.save()
else:
# Not all done, but none were open for processing; thus, wait to
# see if one re-opens
print 'Standing by for reopening of "working" job...'
sys.stdout.flush()
time.sleep(sleep)
评论列表
文章目录