qmmap.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:qmmap 作者: hiqlabs 项目源码 文件源码
def do_chunks(init, proc, src_col, dest_col, query, key, sort, verbose, sleep=60):
    while housekeep.objects(state = 'done').count() < housekeep.objects.count():
        tnow = datetime.datetime.utcnow()
        raw = housekeep._collection.find_and_modify(
            {'state': 'open'},
            {
                '$set': {
                    'state': 'working',
                    'tstart': tnow,
                    'procname': procname(),
                }
            }
        )
        # if raw==None, someone scooped us
        if raw != None:
            raw_id = raw['_id']
            #reload as mongoengine object -- _id is .start (because set as primary_key)
            hko = housekeep.objects(start = raw_id)[0]
            # Record git commit for sanity
#             hko.git = git.Git('.').rev_parse('HEAD')
#             hko.save()
            # get data pointed to by housekeep
            qq = {'$and': [query, {key: {'$gte': hko.start}}, {key: {'$lte': hko.end}}]}
            # Make cursor not timeout, using version-appropriate paramater
            if pymongo.version_tuple[0] == 2:
                cursor = src_col.find(qq, timeout=False)
            elif pymongo.version_tuple[0] == 3:
                cursor = src_col.find(qq, no_cursor_timeout=True)
            else:
                raise Exception("Unknown pymongo version")
            # Set the sort parameters on the cursor
            if sort[0] == "-":
                cursor = cursor.sort(sort[1:], pymongo.DESCENDING)
            else:
                cursor = cursor.sort(sort, pymongo.ASCENDING)
            if verbose & 2: print "mongo_process: %d elements in chunk %s-%s" % (cursor.count(), hko.start, hko.end)
            sys.stdout.flush()
            # This is where processing happens
            hko.good =_process(init, proc, cursor, dest_col, verbose,
                hkstart=raw_id)
            # Check if another job finished it while this one was plugging away
            hko_later = housekeep.objects(start = raw_id).only('state')[0]
            if hko.good == -1:  # Early exit signal
                print "Chunk at %s lost to another process; not updating" % raw_id
                sys.stdout.flush()
            elif hko_later.state == 'done':
                print "Chunk at %s had already finished; not updating" % raw_id
                sys.stdout.flush()
            else:
                hko.state = 'done'
                hko.procname = 'none'
                hko.time = datetime.datetime.utcnow()
                hko.save()
        else:
            # Not all done, but none were open for processing; thus, wait to
            # see if one re-opens
            print 'Standing by for reopening of "working" job...'
            sys.stdout.flush()
            time.sleep(sleep)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号