python类Search()的实例源码

api.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def execute_search(search_obj):
    """
    Executes a search against ES after checking the connection

    Args:
        search_obj (Search): elasticsearch_dsl Search object

    Returns:
        elasticsearch_dsl.result.Response: ES response
    """
    # make sure there is a live connection
    if search_obj._index is None:  # pylint: disable=protected-access
        # If you're seeing this it means you're creating Search() without using
        # create_search_obj which sets important fields like the index and doc_type.
        raise ImproperlyConfigured("search object is missing an index")

    get_conn()
    return search_obj.execute()
report.py 文件源码 项目:source-code-to-name 作者: zironycho 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def first_words(index='codetoname', language='python'):
    es = elasticsearch.Elasticsearch()

    # update first name
    s = elasticsearch_dsl.Search(using=es, index=index, doc_type=language)\
        .query('bool', filter=Q('exists', field='feature') & Q('missing', field='first_name'))
    for hit in s.scan():
        data = hit.to_dict()
        feature = json.loads(data['feature'])
        data['first_name'] = firstname(feature['name'], language)
        es.index(index=index, doc_type=language, id=hit.meta.id, body=data)
    es.indices.refresh(index=index)

    # aggregation
    s = elasticsearch_dsl.Search(using=es, index=index, doc_type=language)\
        .query('bool', filter=Q('exists', field='feature'))
    a = A('terms', field='first_name')
    s.aggs.bucket('first_name_terms', a)
    response = s.execute()

    words = []
    for item in response.aggregations.first_name_terms.buckets:
        percentage = item.doc_count / float(response.hits.total) * 100
        words.append({'word': item.key, 'percentage': percentage})
    return words
default.py 文件源码 项目:Stream4Flow 作者: CSIRT-MU 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_summary_statistics():
    """
    Obtains statistics about current sum of flows, packets, bytes.

    :return: JSON with status "ok" or "error" and requested data.
    """

    try:
        # Elastic query
        client = elasticsearch.Elasticsearch([{'host': myconf.get('consumer.hostname'), 'port': myconf.get('consumer.port')}])
        elastic_bool = []
        elastic_bool.append({'range': {'@timestamp': {'gte': "now-5m", 'lte': "now"}}})
        elastic_bool.append({'term': {'@type': 'protocols_statistics'}})

        qx = Q({'bool': {'must': elastic_bool}})
        s = Search(using=client, index='_all').query(qx)
        s.aggs.bucket('sum_of_flows', 'sum', field='flows')
        s.aggs.bucket('sum_of_packets', 'sum', field='packets')
        s.aggs.bucket('sum_of_bytes', 'sum', field='bytes')
        s.sort('@timestamp')
        result = s.execute()

        # Result Parsing into CSV in format: timestamp, tcp protocol value, udp protocol value
        data = "Timestamp, Flows, Packets, Bytes;"
        timestamp = "Last 5 Minutes"
        data += timestamp + ', ' +\
                str(int(result.aggregations.sum_of_flows['value'])) + ', ' +\
                str(int(result.aggregations.sum_of_packets['value'])) + ', ' +\
                str(int(result.aggregations.sum_of_bytes['value']))

        json_response = '{"status": "Ok", "data": "' + data + '"}'
        return json_response

    except Exception as e:
        json_response = '{"status": "Error", "data": "Elasticsearch query exception: ' + escape(str(e)) + '"}'
        return json_response
elasticsearch.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def filter_by(self, query, conditions, order_criteria=None):
        """
        :param query: Search object
        :type query: elasticsearch.Search

        :param conditions: conditions dictionary
        :type conditions: dict

        :param order_criteria: optional order criteria
        :type order_criteria: list

        :return: modified query
        :rtype: elasticsearch.Search
        """
        expressions = self._build_filter_expressions(conditions, None)
        if expressions is None:
            return query
        if order_criteria and '_score' not in order_criteria and '-_score' not in order_criteria:
            return query.update_from_dict({'query': {'constant_score': {'filter': expressions}}})
        return query.update_from_dict({'query': expressions})
utilities.py 文件源码 项目:mordecai 作者: openeventdata 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setup_es(es_ip, es_port):
    """
    Setup an Elasticsearch connection

    Parameters
    ----------
    es_ip: string
            IP address for elasticsearch instance
    es_port: string
            Port for elasticsearch instance
    Returns
    -------
    es_conn: an elasticsearch_dsl Search connection object.
    """
    CLIENT = Elasticsearch([{'host' : es_ip, 'port' : es_port}])
    S = Search(using=CLIENT, index="geonames")
    return S
api.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def prepare_and_execute_search(user, search_param_dict=None, search_func=execute_search,
                               filter_on_email_optin=False):
    """
    Prepares a Search object and executes the search against ES

    Args:
        user (User): User object
        search_param_dict (dict): A dict representing the body of an ES query
        search_func (callable): The function that executes the search
        filter_on_email_optin (bool): If true, filter out profiles where email_optin != True

    Returns:
        elasticsearch_dsl.result.Response: ES response
    """
    search_obj = create_search_obj(
        user,
        search_param_dict=search_param_dict,
        filter_on_email_optin=filter_on_email_optin,
    )
    return search_func(search_obj)
test_search.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_work_types_dont_override_provider(self):
        """[#122] Selecting work types should be a subset of providers, not override them"""
        img1 = models.Image.objects.create(url='example.com/1', title='hello', provider='flickr')
        img2 = models.Image.objects.create(url='example.com/2', title='hello', provider='nypl')
        self._index_img(img1)
        self._index_img(img2)

        # Search by provider=flickr but work type=cultural should limit by Flickr first
        resp = self.client.get(self.url, {'search_fields': 'title',
                                          'search': 'hello',
                                          'providers': 'flickr',
                                          'work_types': 'cultural'})

        # One result, the correct one
        self.assertEqual(1, len(select_nodes(resp, '.t-image-result')))
        # We now have also img[data-identifier], which is used by photoswipe 
        self.assertEqual(1, len(select_nodes(resp, 'div[data-identifier="' + img1.identifier +'"]')))
        self.assertEqual(0, len(select_nodes(resp, 'div[data-identifier="' + img2.identifier +'"]')))
site_views.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def about(request):
    """Information about the current site, its goals, and what content is loaded"""
    # Provider counts
    providers = cache.get_or_set(CACHE_STATS_NAME, [], CACHE_STATS_DURATION)
    if not providers:
        for provider in sorted(settings.PROVIDERS.keys()):
            s = Search()
            q = Q('term', provider=provider)
            s = s.query(q)
            response = s.execute()
            if response.hits.total > 0:
                data = settings.PROVIDERS[provider]
                total = intcomma(response.hits.total)
                data.update({'hits': total})
                providers.append(data)
        # All results
        s = Search()
        response = s.execute()
        total = intcomma(response.hits.total)
        providers.append({'display_name': 'Total', 'hits': total})
        cache.set(CACHE_STATS_NAME, providers)
    return render(request, "about.html", {'providers': providers})
util.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def correct_orphan_records(self, provider='europeana', end=None):
        """[#185] Delete records from the search engine which aren't found in the database"""
        s = Search()
        q = Q('term', provider=provider)
        s = s.query(q)
        response = s.execute()
        total = response.hits.total
        # A file extracted from the production database listing all of the europeana identifiers
        identifier_file = '/tmp/europeana-identifiers.json'
        db_identifiers = set(json.load(open(identifier_file)))
        total_in_db = len(db_identifiers)
        log.info("Using search engine instance %s", settings.ELASTICSEARCH_URL)
        log.info("Total records: %d (search engine), %d (database) [diff=%d]", total, total_in_db, total - total_in_db)
        deleted_count = 0
        for r in s.scan():
            if r.identifier not in db_identifiers:
                img = search.Image.get(id=r.identifier)
                log.debug("Going to delete image %s", img)
                deleted_count += 1
        log.info("Deleted %d from search engine", deleted_count)
test_aggregations.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_filter_robots(app, es, event_queues, indexed_events, with_robots):
    """Test the filter_robots query modifier."""
    query_modifiers = []
    if not with_robots:
        query_modifiers = [filter_robots]
    StatAggregator(client=current_search_client,
                   event='file-download',
                   aggregation_field='file_id',
                   aggregation_interval='day',
                   query_modifiers=query_modifiers).run()
    current_search_client.indices.refresh(index='*')
    query = Search(
        using=current_search_client,
        index='stats-file-download',
        doc_type='file-download-day-aggregation'
    )[0:30].sort('file_id')
    results = query.execute()
    assert len(results) == 3
    for result in results:
        if 'file_id' in result:
            assert result.count == (5 if with_robots else 2)
aggregations.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_oldest_event_timestamp(self):
        """Search for the oldest event timestamp."""
        # Retrieve the oldest event in order to start aggregation
        # from there
        query_events = Search(
            using=self.client,
            index=self.event_index
        )[0:1].sort(
            {'timestamp': {'order': 'asc'}}
        )
        result = query_events.execute()
        # There might not be any events yet if the first event have been
        # indexed but the indices have not been refreshed yet.
        if len(result) == 0:
            return None
        return parser.parse(result[0]['timestamp'])
queries.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def to_elasticsearch_object(self, client) -> Search:
        assert self.indexes

        search = (
            Search()
            .using(client)
            .index(*self.indexes)
            .query(self.query)
        )
        if self.sort_fields:
            search = search.sort(*self.sort_fields)

        size = 15  # default size
        if self.size:
            size = self.size

        search = search[0:size]

        return search
pokedex.py 文件源码 项目:TrainerDex-RedCog 作者: TrainerDex 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pokedex(self, pokemon):
        s = Search(using=self.client, index="pokemon").query("match", name={'query': pokemon, 'fuzziness': 2})
        response = s.execute()
        if response.hits.total == 0:
            await self.bot.say("I couldn't find that pokemon")
            return
        hit = response[0]
        embed=discord.Embed(title=hit.name, url="http://bulbapedia.bulbagarden.net/wiki/{}".format(hit.name), timestamp=(datetime.datetime(2017,7,6)))
        embed.set_thumbnail(url="http://serebii.net/pokemongo/pokemon/{:03d}.png".format(int(hit.meta.id)))
        embed.add_field(name='Base Attack Stat', value=hit.attack_ratio)
        embed.add_field(name='Base Defence Stat', value=hit.defense_ratio)
        embed.add_field(name='Base HP Stat', value=hit.hp_ratio)
        embed.add_field(name='Min CP', value=hit.min_cp_cap)
        embed.add_field(name='Max CP', value=hit.max_cp_cap)
        embed.add_field(name='Best Offensive Moveset', value=hit.basic_attack+' / '+hit.charge_attack)
        #embed.add_field(name='Basic Atk', value=hit.basic_attack)
        #embed.add_field(name='Quick DPS', value=hit.quick_dps)
        #embed.add_field(name='Charge Atk', value=hit.charge_attack)
        #embed.add_field(name='Charge DPS', value=hit.charge_dps)
        #embed.add_field(name='Offensive %', value=hit.offensive_percent)
        #embed.add_field(name='Duel %', value=hit.duel_percent)
        #embed.add_field(name='Defensive %', value=hit.defensive_percent)
        #embed.add_field(name='Full Cycle DPS', value=hit.full_cycle_dps)
        embed.set_footer(text='Min and Max CP are for level 40. Best Offensive Moveset may be incorrect.')
        await self.bot.say(embed=embed)
subscriptions.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def find(replica: str):
    owner = request.token_info['email']
    es_client = ElasticsearchClient.get(logger)

    search_obj = Search(using=es_client,
                        index=Config.get_es_index_name(ESIndexType.subscriptions, Replica[replica]),
                        doc_type=ESDocType.subscription.name)
    search = search_obj.query({'match': {'owner': owner}})

    responses = [{
        'uuid': hit.meta.id,
        'replica': replica,
        'owner': owner,
        'callback_url': hit.callback_url,
        'es_query': hit.es_query.to_dict()}
        for hit in search.scan()]

    full_response = {'subscriptions': responses}
    return jsonify(full_response), requests.codes.okay
reader_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_missing_field_update(self):
        """Al actualizar una distribución, si falta un field
        previamente indexado, no se borran los datos anteriores
        """
        missing_field = '212.1_PSCIOS_ERS_0_0_22'

        self._index_catalog('full_ts_data.json')
        # Segunda corrida, 'actualización' del catálogo
        self._index_catalog('missing_field.json')

        results = Search(using=self.elastic,
                         index=self.test_index) \
            .filter('match', series_id=missing_field).execute()

        self.assertTrue(len(results))
        self.assertTrue(Field.objects.filter(series_id=missing_field))
esnotifications.py 文件源码 项目:stethoscope 作者: Netflix 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_notifications_by_email(self, email):
    search = elasticsearch_dsl.Search(using=self.client, index=self.config['ELASTICSEARCH_INDEX'],
      doc_type=self.config['ELASTICSEARCH_DOCTYPE'])

    query = self.create_query_for_email(search, email)

    # logger.debug("query:\n{!s}", pprint.pformat(query.to_dict()))

    try:
      response = query.execute()
    except elasticsearch.exceptions.ElasticsearchException:
      logger.exception("Exception caught in Elasticsearch query:\n  index: {!r}\n  doc_type: {!r}\n"
                       "  query: {!s}".format(self.config['ELASTICSEARCH_INDEX'],
                         self.config['ELASTICSEARCH_DOCTYPE'], pprint.pformat(query.to_dict())))

    # logger.debug("response:\n{!s}", pprint.pformat(response.to_dict()))

    return response.hits.hits
breached.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tag_by_email(self, emails, breached):
        docs = []
        s = Search(using=self.es).\
            filter(Q({'terms': {'contact_email.keyword': emails}})).\
            source(['id_submission'])
        print('%s emails breached=%s' % (len(emails), breached))
        for hit in s.scan():
            docs.append(lib.bulk_update_doc(hit['id_submission'], {'breached': breached}))
            if not len(docs) % 500:
                print('\tfetched %s' % len(docs))
        print('\t%s matches' % len(docs))
        return docs
breached.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self):
        emails = {
            'breached': set(),
            'unbreached': set(),
        }
        # contact_email exists
        must = [Q('exists', field='contact_email')]
        # matches source if specified
        if self.source:
            must.append(Q({'term': {'analysis.source': self.source}}))
        # not already tagged with breached
        s = Search(using=self.es).\
            query(FunctionScore(
                  query=Q('bool',
                          must=must,
                          must_not=[Q('exists', field='analysis.breached')]),
                  functions=[SF('random_score', seed=int(time.time()))]
            )).\
            source(['contact_email'])
        print('%s breached: source=%s limit=%s' % (datetime.now().isoformat(), self.source,
            self.limit))
        print('query=\n%s' % json.dumps(s.to_dict()))
        for filing in s[:self.limit]:
            email = filing['contact_email']
            if not email or email in emails['breached'] or email in emails['unbreached']:
                continue
            breached = self.is_breached(email)
            emails['breached' if breached else 'unbreached'].add(email)
        docs = []
        print('done source=%s' % self.source)
        if emails['breached']:
            docs += self.tag_by_email(list(emails['breached']), True)
        if emails['unbreached']:
            docs += self.tag_by_email(list(emails['unbreached']), False)
        try:
            lib.bulk_update(self.es, docs)
        except Exception as e:
            print('error indexing: %s' % e)
console.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def monitor(index, delta, query_string):
    click.clear()

    def cnt():
        q = Q('query_string', query=query_string)
        s = Search(
                using=es.client,
                index=index).query(q)
        return s.count()

    N = cnt()
    tot = Search(using=es.client, index=index).count()

    if not delta:
        N = tot

    log.info('Processing %d records (total: %d)', N, tot)

    click.echo('You can exit by CTRL-C: results will still process')

    bar = SlowOverallFancyBar('', max=N, grand_total=tot)
    while True:
        time.sleep(5.0)
        try:
            n = cnt()
            if isinstance(n, int):
                if delta:
                    done = N - n
                else:
                    done = n
                bar.goto(done)
        except Exception as e:
            log.warn('Cannot count: %s', e)
    bar.finish()
console.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clone_index(use_helper, from_index, to_index):
    """Clone an index"""
    from elasticsearch_dsl import Search
    from elasticsearch.helpers import reindex

    click.clear()

    if not es.client.indices.exists(index=to_index):
        click.secho('%s not existing!'.format(to_index), fg='red')
        return 1

    cnt = Search(using=es.client, index=to_index).count()
    message = 'Index %s already exists (%d records). Overwrite?' % (
            to_index, cnt)

    click.confirm(message, abort=True)

    if use_helper:
        reindex(
                client=es.client,
                source_index=from_index,
                target_index=to_index)
    else:
        es.client.reindex(
                body=dict(
                    source=dict(index=from_index),
                    dest=dict(index=to_index)),
                wait_for_completion=False)
console.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def monitor_clone_index(from_index, to_index):
    """Monitor the size of an index"""
    from elasticsearch_dsl import Search

    click.clear()

    cnt = Search(using=es.client, index=from_index).count()

    bar = SlowFancyBar('', max=cnt)
    while True:
        time.sleep(2.0)
        _cnt = Search(using=es.client, index=to_index).count()
        bar.goto(_cnt)
    bar.finish()
elastic.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def search(self, **kwargs):
        q = kwargs.get('q', '*')
        sort = kwargs.get('sort', 'timestamp')
        search_after = kwargs.get('search_after')
        size = kwargs.get('size', 50)
        source = kwargs.get('source')
        extra = dict(
                size=size)

        if search_after:
            extra.update(dict(search_after=search_after))

        s = Search(using=self.client, index=self.index_name)
        if source:
            s = s.source(source)
        s = s.sort(sort)
        s = s.query(Q('query_string', query=q))
        s = s.extra(**extra)

        log.info('Query: %s', s.to_dict())

        r = s.execute()
        count = r.hits.total
        took = r.took

        result = r, count, took

        return result
elastic.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def count(self, index, query):
        try:
            s = Search(
                    using=self.client,
                    index=index,
                    doc_type=self.doc_type). \
                            update_from_dict(query)
            log.info('Querying: %s', s.to_dict())

            return s.count()
        except Exception as e:
            log.warn('Cannot count: %s', e)
viewsets.py 文件源码 项目:django-elasticsearch-dsl-drf 作者: barseghyanartur 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        assert self.document is not None

        self.client = connections.get_connection()
        self.index = self.document._doc_type.index
        self.mapping = self.document._doc_type.mapping.properties.name
        self.search = Search(using=self.client, index=self.index)
        super(BaseDocumentViewSet, self).__init__(*args, **kwargs)
elasticsearch.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_base_query(self, req, resp):
        return Search(using=self.connection,
                      index=self.objects_class._doc_type.index,
                      doc_type=self.objects_class)
test_elasticsearch.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_filter_by(connection, query_filtered):
    """
    Test `get_object` func
    """
    conditions, expected = query_filtered
    if isinstance(conditions, str):
        conditions = json.loads(conditions, object_pairs_hook=OrderedDict)
    if isinstance(expected, str):
        expected = json.loads(expected)
    c = CollectionResource(objects_class=Model, connection=connection)
    query_obj = c.filter_by(Search(using=connection).doc_type(Model), conditions)
    assert query_obj.to_dict()['query'] == expected
test_elasticsearch.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_order_by(connection, query_ordered):
    """
    Test `get_object` func
    """
    conditions, expected = query_ordered
    if isinstance(conditions, str):
        conditions = json.loads(conditions, object_pairs_hook=OrderedDict)
    if isinstance(expected, str):
        expected = json.loads(expected)
    query_obj = Search(using=connection, doc_type=Model).sort(*conditions)
    assert query_obj.to_dict() == expected
test_elasticsearch.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_totals(connection, query_totals):
    """
    Test `get_object` func
    """
    totals, expected = query_totals
    if isinstance(totals, str):
        totals = json.loads(totals, object_pairs_hook=OrderedDict)
    if isinstance(expected, str):
        expected = json.loads(expected)
    c = CollectionResource(objects_class=Model, connection=connection)
    query_obj = c._build_total_expressions(Search(using=connection).doc_type(Model), totals)
    assert query_obj.to_dict() == expected
api.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def search_for_field(search_obj, field_name, page_size=DEFAULT_ES_LOOP_PAGE_SIZE):
    """
    Retrieves all unique instances of a field for documents that match an ES query

    Args:
        search_obj (Search): Search object
        field_name (str): The name of the field for the value to get
        page_size (int): Number of docs per page of results

    Returns:
        set: Set of unique values
    """
    results = set()
    # Maintaining a consistent sort on '_doc' will help prevent bugs where the
    # index is altered during the loop.
    # This also limits the query to only return the field value.
    search_obj = search_obj.sort('_doc').fields(field_name)
    loop = 0
    all_results_returned = False
    while not all_results_returned:
        from_index = loop * page_size
        to_index = from_index + page_size
        search_results = execute_search(search_obj[from_index: to_index])
        # add the field value for every search result hit to the set
        for hit in search_results.hits:
            results.add(getattr(hit, field_name)[0])
        all_results_returned = to_index >= search_results.hits.total
        loop += 1
    return results
api.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_all_query_matching_emails(search_obj, page_size=DEFAULT_ES_LOOP_PAGE_SIZE):
    """
    Retrieves all unique emails for documents that match an ES query

    Args:
        search_obj (Search): Search object
        page_size (int): Number of docs per page of results

    Returns:
        set: Set of unique emails
    """
    return search_for_field(search_obj, "email", page_size=page_size)


问题


面经


文章

微信
公众号

扫码关注公众号