python类scan()的实例源码

sentiment.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tag_by_phrase(self, tag_query, source):
        print('query=%s source=%s' % (json.dumps(tag_query), source))
        resp = self.es.search(index='fcc-comments', body=tag_query, size=0)
        total = resp['hits']['total']
        print('tagging %s / %s matches' % (self.limit, total))
        docs = []
        for doc in scan(self.es, index='fcc-comments', query=tag_query, size=1000):
            docs.append(lib.bulk_update_doc(doc['_id'], {'source': source}))
            if not len(docs) % 1000:
                print('\tfetched %s\n%s\t%s' % (len(docs), doc['_id'], doc['_source']['text_data'][:400]))
            if len(docs) >= self.limit:
                break

        print('indexing %s' % (len(docs)))
        tagged = lib.bulk_update(self.es, docs)
        print('tagged %s / %s matches' % (tagged, total))
        return tagged
sentiment.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def preview(self, fraction=0.1):
        fetched = 0
        scores = []
        mod_print = int(1 / fraction)
        while fetched < self.limit:
            '''
                use search instead of scan because keeping an ordered scan cursor
                open negates the performance benefits
            '''
            resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit)
            print('total=%s mod_print=%s' % (resp['hits']['total'], mod_print))
            for doc in resp['hits']['hits']:
                fetched += 1
                scores.append(doc['_score'])
                if not fetched % mod_print:
                    print('\n--- comment %s\t%s\t%s\t%s' % (fetched, doc['_id'],
                        doc['_score'], doc['_source']['text_data'][:1000]))
indexer.py 文件源码 项目:graph-data-experiment 作者: occrp-attic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete_dataset(dataset_name):
    """Delete all entries from a particular dataset."""
    q = {'query': {'term': {'dataset': dataset_name}}, '_source': False}

    def deletes():
        for i, res in enumerate(scan(es, query=q, index=es_index)):
            yield {
                '_op_type': 'delete',
                '_index': str(es_index),
                '_type': res.get('_type'),
                '_id': res.get('_id')
            }
            if i > 0 and i % 10000 == 0:
                log.info("Delete %s: %s", dataset_name, i)
    es.indices.refresh(index=es_index)
    bulk(es, deletes(), stats_only=True, chunk_size=DATA_PAGE)
    optimize_search()
crossref.py 文件源码 项目:graph-data-experiment 作者: occrp-attic 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _scan_fingerprints(dataset_name=None):
    if dataset_name:
        q = {'term': {'dataset': dataset_name}}
    else:
        q = {'match_all': {}}
    q = {
        'query': q,
        '_source': ['fingerprints', 'dataset']
    }

    scan_iter = scan(es, query=q, index=es_index, doc_type=Schema.ENTITY)
    for i, doc in enumerate(scan_iter):
        source = doc.get('_source')
        fps = source.get('fingerprints')
        if fps is None:
            continue
        for fp in fps:
            if fp is None:
                continue
            yield fp, source.get('dataset')
        if i != 0 and i % 10000 == 0:
            log.info("Crossref: %s entities...", i)
index_document.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def find_matching_subscriptions(self, index_name: str) -> set:
        percolate_document = {
            'query': {
                'percolate': {
                    'field': "query",
                    'document_type': ESDocType.doc.name,
                    'document': self
                }
            }
        }
        subscription_ids = set()
        for hit in scan(ElasticsearchClient.get(self.logger),
                        index=index_name,
                        query=percolate_document):
            subscription_ids.add(hit["_id"])
        self.logger.debug("Found matching subscription count: %i", len(subscription_ids))
        return subscription_ids
index.py 文件源码 项目:elasticsearch-django 作者: yunojuno 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def scan_index(index, model):
    """
    Yield all documents of model type in an index.

    This function calls the elasticsearch.helpers.scan function,
    and yields all the documents in the index that match the doc_type
    produced by a specific Django model.

    Args:
        index: string, the name of the index to scan, must be a configured
            index as returned from settings.get_index_names.
        model: a Django model type, used to filter the the documents that
            are scanned.

    Yields each document of type model in index, one at a time.

    """
    # see https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-type-query.html
    query = {"query": {"type": {"value": model._meta.model_name}}}
    client = get_client()
    for hit in helpers.scan(client, index=index, query=query):
        yield hit
esstorage.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_rev_links(self, model, rel, *item_types):
        filter_ = {'term': {'links.' + rel: str(model.uuid)}}
        if item_types:
            filter_ = [
                filter_,
                {'terms': {'item_type': item_types}},
            ]
        query = {
            'stored_fields': [],
            'query': {
                'bool': {
                    'filter': filter_,
                }
            }
        }

        return [
            hit['_id'] for hit in scan(self.es, query=query)
        ]
sentiment.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        '''
            get documents without a sentiment tag that match significant terms:
            - significant terms from postive regex tagged vs others
            - extra multi match clause for stronger terms (in multiple term sets:
                positive vs negative, untagged, and all
            - phrase match net neutrality since both terms score high
        '''

        index_queue = multiprocessing.Queue()

        bulk_index_process = multiprocessing.Process(
            target=self.bulk_index, args=(index_queue,),
        )
        bulk_index_process.start()
        fetched = 0
        try:
            while fetched < self.limit:
                '''
                    use search instead of scan because keeping an ordered scan cursor
                    open negates the performance benefits
                '''
                resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit)
                for doc in resp['hits']['hits']:
                    index_queue.put(doc['_id'])
                    fetched += 1
                    if not fetched % 100:
                        print('%s\t%s\t%s' % (fetched, doc['_score'],
                            doc['_source']['text_data']))
        except ConnectionTimeout:
            print('error fetching: connection timeout')

        index_queue.put(None)
        bulk_index_process.join()
elastic.py 文件源码 项目:defplorex 作者: trendmicro 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def scan(self, index, query, limit=None, id_only=False):
        size = self.bulk_size
        max_records = None
        cnt = 0

        if isinstance(limit, int):
            if limit > 0:
                size = min(limit, size)
                max_records = limit

        kw = dict(
            index=index,
            query=query,
            size=size
        )

        if id_only:
            kw['_source'] = ['_id']

        log.debug('Scanning for %s (size = %d, index = %s)',
                  query, size, index)

        for hit in helpers.scan(self.client, **kw):
            if max_records:
                if cnt >= max_records:
                    log.debug('Stopping after pulling %d records'
                              ' as requested', cnt)
                    raise StopIteration

            log.debug('Yielding %s', hit['_id'])
            cnt += 1

            if id_only:
                yield hit.get('_id')
            else:
                yield hit
logIoc.py 文件源码 项目:log-ioc 作者: willylong275 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_hits(self, start_time, stop_time):
                #print "in get hits"
                time_filter_query={ "query":{"bool": { "filter": {"range": {"@timestamp": {
                                                                                        "gte": start_time,
                                                                                        "lte": stop_time,
                                                                                        "format":"epoch_millis"
                                                                                        }}}}}}
                scan_generator=helpers.scan(self.es, query=time_filter_query, index=self.es_index,)
                newHits=[]
                for item in scan_generator:
                        utc_record=item['_source']['@timestamp'][:-1]
                        new_value=calendar.timegm(datetime.datetime.strptime(utc_record, "%Y-%m-%dT%H:%M:%S.%f").timetuple())*1000
                        item['@timestamp']=new_value
                        newHits.append(item)
                return newHits
es.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def doc_feeder(self, index_type=None, index_name=None, step=10000,
                   verbose=True, query=None, scroll='10m', **kwargs):
        conn = self.conn
        index_name = index_name or self.ES_INDEX_NAME
        doc_type = index_type or self.ES_INDEX_TYPE

        n = self.count(query=query)['count']
        cnt = 0
        t0 = time.time()
        if verbose:
            print('\ttotal docs: {}'.format(n))

        _kwargs = kwargs.copy()
        _kwargs.update(dict(size=step, index=index_name, doc_type=doc_type))
        res = helpers.scan(conn, query=query, scroll=scroll, **_kwargs)
        t1 = time.time()
        for doc in res:
            if verbose and cnt % step == 0:
                if cnt != 0:
                    print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1)))
                print('\t{}-{}...'.format(cnt+1, min(cnt+step, n)), end='')
                t1 = time.time()
            yield doc
            cnt += 1
        if verbose:
            print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1)))
            print("Finished! [{}]".format(timesofar(t0)))
schema.py 文件源码 项目:lagendacommun 作者: ecreall 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_venues_by_location(location, radius, hour_for_cache):
    """Return a list of venues oid that are in location ('latitude,longitude')
    in the given radius. Cache results for one hour.
    """
    lat_lon = location.split(',')
    body = {
        "query": {
            "filtered": {
                "query": {"match_all": {}},
                "filter": {
                    "geo_distance": {
                        "distance": str(radius) + 'km',
                        "location": {
                            "lat": float(lat_lon[0]),
                            "lon": float(lat_lon[1])
                        }
                    }
                }
            }
        },
        "_source": {
            "include": ["oid"]
        }
    }
    try:
        result = scan(
            es,
            index='lac',
            doc_type='geo_location',
            query=body,
            size=500)
        return [v['_source']['oid'] for v in result]
    except Exception as e:
        log.exception(e)
        return []
indexers.py 文件源码 项目:django-rest-search 作者: wemap 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def scan(self, **kwargs):
        es = get_elasticsearch(self)
        return scan(es, index=es._index, doc_type=self.doc_type, **kwargs)
elastic.py 文件源码 项目:python3-utils 作者: soldni 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_scroll(query_dsl, es_client, index_name=None, keep_alive='1m'):
    """Returns an iterator for results matching query_dsl."""

    if index_name is None:
        index_name = es_client.index_name

    scroll = scan(
        es_client, query=query_dsl, scroll=keep_alive, index=index_name)

    return scroll
tags.py 文件源码 项目:repoxplorer 作者: morucci 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_tags(self, repos, fromdate=None, todate=None):

        filter = {
            "bool": {
                "must": [],
                "should": [],
                }
            }

        for repo in repos:
            should_repo_clause = {
                "bool": {
                    "must": []
                }
            }
            should_repo_clause["bool"]["must"].append(
                {"term": {"repo": repo}}
            )
            filter["bool"]["should"].append(should_repo_clause)

        body = {
            "filter": filter
        }

        body["filter"]["bool"]["must"].append(
            {
                "range": {
                    "date": {
                        "gte": fromdate,
                        "lt": todate,
                    }
                }
            }
        )

        return [t for t in scanner(self.es, query=body,
                index=self.index, doc_type=self.dbname)]
mc_neighbors.py 文件源码 项目:mediachain-indexer 作者: mediachain 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def scan_all(self,
                 scroll = '5m', #TODO - hard coded timeout.
                 ):
        """
        Most efficient way to scan all documents.
        """

        rr = es_scan(client = self.es,
                     index = self.index_name,
                     doc_type = self.doc_type,
                     scroll = scroll,
                     query = {"query": {'match_all': {}}},
                     )

        return rr
mc_models.py 文件源码 项目:mediachain-indexer 作者: mediachain 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def typeahead_generate():
    """
    Re-generate typeahead search. This consists of a weighted set of completions for every possible query.

    Weighing ideas:
        - query frequency.
        - query results quality / count.
        - language model.

    TODO: Consider having the `NearestNeighborsBase` storage create this incrementally? 
          Is that approach really better in a clustered setup?
    """

    assert False,'WIP'

    if mc_config.LOW_LEVEL:
        es = mc_neighbors.low_level_es_connect()    

        res = scan(client = es,
                   index = index_name,
                   doc_type = doc_type,
                   scroll = '5m', #TODO - hard coded.
                   query = {"query": {'match_all': {}
                                     },
                           #'from':0,
                           #'size':1,                           
                           },
                   )
    else:
        nes = mc_neighbors.high_level_connect(index_name = index_name,
                                              doc_type = doc_type,
                                              )

        res = nes.scan_all()
api_operations.py 文件源码 项目:match3d 作者: ascribe 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def list_designs(self):
        """
        Return a list designs in corpus

        :return: a list of all design ids
        """
        result = {}
        s = scan(self.es, index=self.index_name)
        for r in s:
            result.update({r['_source']['stl_id']: True})

        return result.keys()
elasticsearch5.py 文件源码 项目:django-haystack-elasticsearch 作者: CraveFood 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def clear(self, models=None, commit=True):
        """
        Clears the backend of all documents/objects for a collection of models.

        :param models: List or tuple of models to clear.
        :param commit: Not used.
        """
        if models is not None:
            assert isinstance(models, (list, tuple))

        try:
            if models is None:
                self.conn.indices.delete(index=self.index_name, ignore=404)
                self.setup_complete = False
                self.existing_mapping = {}
                self.content_field_name = None
            else:
                models_to_delete = []

                for model in models:
                    models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model)))

                # Delete using scroll API
                query = {'query': {'query_string': {'query': " OR ".join(models_to_delete)}}}
                generator = scan(self.conn, query=query, index=self.index_name, doc_type='modelresult')
                actions = ({
                    '_op_type': 'delete',
                    '_id': doc['_id'],
                } for doc in generator)
                bulk(self.conn, actions=actions, index=self.index_name, doc_type='modelresult')
                self.conn.indices.refresh(index=self.index_name)

        except elasticsearch.TransportError as e:
            if not self.silently_fail:
                raise

            if models is not None:
                self.log.error("Failed to clear Elasticsearch index of models '%s': %s",
                               ','.join(models_to_delete), e, exc_info=True)
            else:
                self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
elasticsearch2.py 文件源码 项目:django-haystack-elasticsearch 作者: CraveFood 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def clear(self, models=None, commit=True):
        """
        Clears the backend of all documents/objects for a collection of models.

        :param models: List or tuple of models to clear.
        :param commit: Not used.
        """
        if models is not None:
            assert isinstance(models, (list, tuple))

        try:
            if models is None:
                self.conn.indices.delete(index=self.index_name, ignore=404)
                self.setup_complete = False
                self.existing_mapping = {}
                self.content_field_name = None
            else:
                models_to_delete = []

                for model in models:
                    models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model)))

                # Delete using scroll API
                query = {'query': {'query_string': {'query': " OR ".join(models_to_delete)}}}
                generator = scan(self.conn, query=query, index=self.index_name, doc_type='modelresult')
                actions = ({
                    '_op_type': 'delete',
                    '_id': doc['_id'],
                } for doc in generator)
                bulk(self.conn, actions=actions, index=self.index_name, doc_type='modelresult')
                self.conn.indices.refresh(index=self.index_name)

        except elasticsearch.TransportError as e:
            if not self.silently_fail:
                raise

            if models is not None:
                self.log.error("Failed to clear Elasticsearch index of models '%s': %s",
                               ','.join(models_to_delete), e, exc_info=True)
            else:
                self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
index_document.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _refresh_percolate_queries(self, index_name: str) -> None:
        # When dynamic templates are used and queries for percolation have been added
        # to an index before the index contains mappings of fields referenced by those queries,
        # the queries must be reloaded when the mappings are present for the queries to match.
        # See: https://github.com/elastic/elasticsearch/issues/5750
        subscription_index_name = Config.get_es_index_name(ESIndexType.subscriptions, self.replica)
        es_client = ElasticsearchClient.get(self.logger)
        if not es_client.indices.exists(subscription_index_name):
            return
        subscription_queries = [{'_index': index_name,
                                 '_type': ESDocType.query.name,
                                 '_id': hit['_id'],
                                 '_source': hit['_source']['es_query']
                                 }
                                for hit in scan(es_client,
                                                index=subscription_index_name,
                                                doc_type=ESDocType.subscription.name,
                                                query={'query': {'match_all': {}}})
                                ]

        if subscription_queries:
            try:
                bulk(es_client, iter(subscription_queries), refresh=True)
            except BulkIndexError as ex:
                self.logger.error("Error occurred when adding subscription queries to index %s Errors: %s",
                                  index_name, ex.errors)
test_elastic2.py 文件源码 项目:elastic2-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _remove(self):
        bulk_deletes = []
        for result in scan(self.elastic_conn,
                           index="test",
                           doc_type="test"):
            result['_op_type'] = 'delete'
            bulk_deletes.append(result)
        bulk(self.elastic_conn, bulk_deletes)
elastic2_doc_manager.py 文件源码 项目:elastic2-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_command(self, doc, namespace, timestamp):
        # Flush buffer before handle command
        self.commit()
        db = namespace.split('.', 1)[0]
        if doc.get('dropDatabase'):
            dbs = self.command_helper.map_db(db)
            for _db in dbs:
                self.elastic.indices.delete(index=_db.lower())

        if doc.get('renameCollection'):
            raise errors.OperationFailed(
                "elastic_doc_manager does not support renaming a mapping.")

        if doc.get('create'):
            db, coll = self.command_helper.map_collection(db, doc['create'])
            if db and coll:
                self.elastic.indices.put_mapping(
                    index=db.lower(), doc_type=coll,
                    body={
                        "_source": {"enabled": True}
                    })

        if doc.get('drop'):
            db, coll = self.command_helper.map_collection(db, doc['drop'])
            if db and coll:
                # This will delete the items in coll, but not get rid of the
                # mapping.
                warnings.warn("Deleting all documents of type %s on index %s."
                              "The mapping definition will persist and must be"
                              "removed manually." % (coll, db))
                responses = streaming_bulk(
                    self.elastic,
                    (dict(result, _op_type='delete') for result in scan(
                        self.elastic, index=db.lower(), doc_type=coll)))
                for ok, resp in responses:
                    if not ok:
                        LOG.error(
                            "Error occurred while deleting ElasticSearch docum"
                            "ent during handling of 'drop' command: %r" % resp)
elastic2_doc_manager.py 文件源码 项目:elastic2-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _stream_search(self, *args, **kwargs):
        """Helper method for iterating over ES search results."""
        for hit in scan(self.elastic, query=kwargs.pop('body', None),
                        scroll='10m', **kwargs):
            hit['_source']['_id'] = hit['_id']
            yield hit['_source']
helpers.py 文件源码 项目:elasticsearch-fabric 作者: KunihikoKido 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def scan(index=None, doc_type=None, **kwargs):
    es = get_client(env.elasticsearch_alias)
    docs = helpers.scan(es, index=index, doc_type=doc_type, ignore=IGNORE, **kwargs)
    for doc in docs:
        jsonprint(doc)
core.py 文件源码 项目:memex-dossier-open 作者: dossier 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_all_urls(self, limit=None):
        '''get all urls in the index
        '''
        res = scan(
            self.conn, index=self.index, doc_type=RECORD_TYPE,
            _source_include=[],
            query={'query': {'match_all': {}}})
        for item in islice(res, limit):
            yield item['_id']
core.py 文件源码 项目:memex-dossier-open 作者: dossier 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_children(self, node):
        '''get child URLs of `url`
        '''
        assert node.replica is not None
        res = scan(
            self.conn, index=self.index, doc_type=UNION_FIND_TYPE,
            _source_include=[],
            query={'query': {'term': {'parent': node.get_id()}}})
        for item in res:
            yield AKANode.from_record(item['_source']['child'])
core.py 文件源码 项目:memex-dossier-open 作者: dossier 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_all_unions(self):
        '''
        '''
        res = scan(
            self.conn, index=self.index, doc_type=UNION_FIND_TYPE)
        for item in res:
            yield item['_source']
elastic_doc_manager.py 文件源码 项目:elastic-doc-manager 作者: mongodb-labs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _stream_search(self, *args, **kwargs):
        """Helper method for iterating over ES search results."""
        for hit in scan(self.elastic, query=kwargs.pop('body', None),
                        scroll='10m', **kwargs):
            hit['_source']['_id'] = hit['_id']
            yield hit['_source']
es.py 文件源码 项目:xunfengES 作者: superhuahua 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_all_plugins(self):
        index = ElasticConfig.INDEX_VULTASKS["index"]
        doc_type = ElasticConfig.INDEX_VULTASKS["type"]
        scroll = "2m"
        size = 30
        body = {"query" : {"match_all": {}}}
        data = helpers.scan(client=self.es, query=body, index=index, doc_type=doc_type, scroll=scroll, size=size)
        return data

    #search assets


问题


面经


文章

微信
公众号

扫码关注公众号