def run(self):
emails = {
'breached': set(),
'unbreached': set(),
}
# contact_email exists
must = [Q('exists', field='contact_email')]
# matches source if specified
if self.source:
must.append(Q({'term': {'analysis.source': self.source}}))
# not already tagged with breached
s = Search(using=self.es).\
query(FunctionScore(
query=Q('bool',
must=must,
must_not=[Q('exists', field='analysis.breached')]),
functions=[SF('random_score', seed=int(time.time()))]
)).\
source(['contact_email'])
print('%s breached: source=%s limit=%s' % (datetime.now().isoformat(), self.source,
self.limit))
print('query=\n%s' % json.dumps(s.to_dict()))
for filing in s[:self.limit]:
email = filing['contact_email']
if not email or email in emails['breached'] or email in emails['unbreached']:
continue
breached = self.is_breached(email)
emails['breached' if breached else 'unbreached'].add(email)
docs = []
print('done source=%s' % self.source)
if emails['breached']:
docs += self.tag_by_email(list(emails['breached']), True)
if emails['unbreached']:
docs += self.tag_by_email(list(emails['unbreached']), False)
try:
lib.bulk_update(self.es, docs)
except Exception as e:
print('error indexing: %s' % e)
评论列表
文章目录