python类Search()的实例源码

elasticsearch.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def title_suggest_view(request: HttpRequest) -> HttpResponse:
    query = request.GET.get('q', '')
    s = Search(using=es_client, index=es_index_name) \
        .source(['title']) \
        .query("match", title_suggest={'query': query, 'operator': 'and', 'fuzziness': 'AUTO'})
    response = s.execute()

    data = json.dumps(
        [{'id': i.meta.id, 'value': i.title} for i in response]
    )
    mime_type = 'application/json; charset=utf-8'
    return HttpResponse(data, mime_type)
es_views.py 文件源码 项目:django-rest-elasticsearch 作者: myarik 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_es_search(self):
        if self.es_model is None:
            msg = "Cannot use %s on a view which does not have the 'es_model'"
            raise ImproperlyConfigured(msg % self.__class__.__name__)
        index = self.es_model()._get_index()
        es_client = self.get_es_client()
        s = Search(using=es_client, index=index, doc_type=self.es_model)
        return s
tor_elasticsearch.py 文件源码 项目:freshonions-torscraper 作者: dirtyfilthy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def elasticsearch_retrieve_page_by_id(page_id):
    query = Search().filter(Q("term", nid=int(page_id)))[:1]
    result = query.execute()
    if result.hits.total == 0:
        return None
    return result.hits[0]
tor_elasticsearch.py 文件源码 项目:freshonions-torscraper 作者: dirtyfilthy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def elasticsearch_delete_old():
    _from = NEVER
    _to   = datetime.now() - timedelta(days=30)
    query = Search().filter(Q("range", visited_at={'from': _from, 'to': _to}))
    result = query.delete()
tor_elasticsearch.py 文件源码 项目:freshonions-torscraper 作者: dirtyfilthy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def elasticsearch_pages(context, sort, page):
    result_limit = int(os.environ['RESULT_LIMIT'])
    max_result_limit = int(os.environ['MAX_RESULT_LIMIT'])
    start = (page - 1) * result_limit
    end   = start + result_limit
    domain_query = Q("term", is_banned=False)
    if context["is_up"]:
        domain_query = domain_query & Q("term", is_up=True)
    if not context["show_fh_default"]:
        domain_query = domain_query & Q("term", is_crap=False)
    if not context["show_subdomains"]:
        domain_query = domain_query & Q("term", is_subdomain=False)
    if context["rep"] == "genuine":
        domain_query = domain_query & Q("term", is_genuine=True)
    if context["rep"] == "fake":
        domain_query = domain_query & Q("term", is_fake=True)



    limit = max_result_limit if context["more"] else result_limit

    has_parent_query = Q("has_parent", type="domain", query=domain_query)
    if context['phrase']:
        query = Search().filter(has_parent_query).query(Q("match_phrase", body_stripped=context['search']))
    else:
        query = Search().filter(has_parent_query).query(Q("match", body_stripped=context['search']))

    query = query.highlight_options(order='score', encoder='html').highlight('body_stripped')[start:end]
    query = query.source(['title','domain_id','created_at', 'visited_at']).params(request_cache=True)

    if   context["sort"] == "onion":
        query = query.sort("_parent")
    elif context["sort"] == "visited_at":
        query = query.sort("-visited_at")
    elif context["sort"] == "created_at":
        query = query.sort("-created_at")
    elif context["sort"] == "last_seen":
        query = query.sort("-visited_at")

    return query.execute()
test_search.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        self.es = search.init_es()
        connections.add_connection('default', self.es)
        self.s = Search(index=settings.ELASTICSEARCH_INDEX)
        search.Image.init()

        self.es.cluster.health(wait_for_status='yellow', request_timeout=2000)
        self.img1 = models.Image(title='greyhounds are fast',
                                 creator="Rashid",
                                 url='http://example.com/1',
                                 license='CC0',
                                 provider="flickr",
                                 source="openimages",
                                 tags_list=['greyhound', 'dog', 'object'])
        self.img2 = models.Image(title='pumpkins are orange',
                                 creator='???',
                                 url='http://example.com/2',
                                 license='CC-BY',
                                 provider="rijksmuseum",
                                 source="rijksmuseum",
                                 tags_list=['gourds', 'fruit', 'object'])
        self.img1.save()
        self.img2.save()
        self.url = reverse('index')
        self.removed = models.Image.objects.create(title='removed', url=FOREIGN_URL + TEST_IMAGE_REMOVED, license="cc0")
test_aggregations.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_date_range(app, es, event_queues, indexed_events):
    aggregate_events(['file-download-agg'])
    current_search_client.indices.refresh(index='*')

    query = Search(using=current_search_client,
                   index='stats-file-download')[0:30].sort('file_id')
    results = query.execute()

    total_count = 0
    for result in results:
        if 'file_id' in result:
            total_count += result.count
    assert total_count == 30
aggregations.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_bookmark(self):
        """Get last aggregation date."""
        if not Index(self.aggregation_alias,
                     using=self.client).exists():
            if not Index(self.event_index,
                         using=self.client).exists():
                return datetime.date.today()
            return self._get_oldest_event_timestamp()

        # retrieve the oldest bookmark
        query_bookmark = Search(
            using=self.client,
            index=self.aggregation_alias,
            doc_type='{0}-bookmark'.format(self.event)
        )[0:1].sort(
            {'date': {'order': 'desc'}}
        )
        bookmarks = query_bookmark.execute()
        # if no bookmark is found but the index exist, the bookmark was somehow
        # lost or never written, so restart from the beginning
        if len(bookmarks) == 0:
            return self._get_oldest_event_timestamp()

        # change it to doc_id_suffix
        bookmark = datetime.datetime.strptime(bookmarks[0].date,
                                              self.doc_id_suffix)
        return bookmark
queries.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def build_query(self, start_date, end_date, **kwargs):
        """Build the elasticsearch query."""
        agg_query = Search(using=self.client,
                           index=self.index,
                           doc_type=self.doc_type)[0:0]
        if start_date is not None or end_date is not None:
            time_range = {}
            if start_date is not None:
                time_range['gte'] = start_date.isoformat()
            if end_date is not None:
                time_range['lte'] = end_date.isoformat()
            agg_query = agg_query.filter(
                'range',
                **{self.time_field: time_range})

        term_agg = agg_query.aggs
        for term in self.aggregated_fields:
            term_agg = term_agg.bucket(term, 'terms', field=term, size=0)
        term_agg.metric('total', 'sum', field='count')

        if self.copy_fields:
            term_agg.metric(
                'top_hit', 'top_hits', size=1, sort={'timestamp': 'desc'}
            )

        for query_param, filtered_field in self.required_filters.items():
            if query_param in kwargs:
                agg_query = agg_query.filter(
                    'term', **{filtered_field: kwargs[query_param]}
                )

        return agg_query
es_download.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(description='Download items from ES index')
    arg = parser.add_argument
    arg('output', help='output in .jl.gz format')
    arg('index', help='ES index name')
    arg('--domain', help='url.domain to filter')
    arg('--id', help='record id')
    arg('--host', default='localhost', help='ES host in host[:port] format')
    arg('--user', help='HTTP Basic Auth user')
    arg('--password', help='HTTP Basic Auth password')
    arg('--chunk-size', type=int, default=100, help='download chunk size')

    args = parser.parse_args()
    kwargs = {}
    if args.user or args.password:
        kwargs['http_auth'] = (args.user, args.password)

    client = elasticsearch.Elasticsearch(
        [args.host],
        connection_class=elasticsearch.RequestsHttpConnection,
        timeout=600,
        **kwargs)
    print(client.info())

    search = Search(using=client, index=args.index)
    if args.domain:
        search = search.filter('term', **{'url.domain': args.domain})
    if args.id:
        search = search.filter('term', **{'_id': args.id})

    total = 0
    with tqdm.tqdm(total=search.count()) as pbar:
        with gzip.open(args.output, 'wt') as f:
            for x in search.params(size=args.chunk_size).scan():
                total += 1
                pbar.update(1)
                f.write(json.dumps(x.to_dict()))
                f.write('\n')

    print('{:,} items downloaded to {}'.format(total, args.output))
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def determine_metadata(self, request, view):
        result = super().determine_metadata(request, view)
        result['parameters'] = {
            'q': {
                'type': 'string',
                'description': 'The query to search for',
                'required': True,
            },
        }
        return result


# =============================================
# Search view sets
# =============================================
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
        """
        Construct the search query that is executed by this view set.
        """
        raise NotImplementedError
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
        """
        Execute search on Subject
        """
        search = vestiging_query(analyzer)\
            .to_elasticsearch_object(client)
        return search.filter('terms', _type=['vestiging'])
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def search_query(self, client, analyzer: InputQAnalyzer) -> Search:
        """
        Execute search on Subject
        """
        search = mac_query(analyzer).to_elasticsearch_object(client)
        return search
elasticsearch.py 文件源码 项目:jamdb 作者: CenterForOpenScience 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, index, doc_type, connection=None):
        self._connection = connection or ElasticsearchBackend.DEFAULT_CONNECTION
        self._index = index
        self._doc_type = doc_type
        self._connection.indices.create(self._index, ignore=400)
        self._connection.indices.put_mapping(body={doc_type: self.ES_MAPPING}, index=index, doc_type=doc_type)
        self.search = elasticsearch_dsl.Search(self._connection, index=index, doc_type=doc_type)
gyms.py 文件源码 项目:TrainerDex-RedCog 作者: TrainerDex 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def find_gym(self, gym):
        s = Search(using=self.client, index="marker").query("match", title={'query': gym, 'fuzziness': 2, 'slop': 1})
        response = s.execute()
        if response.hits.total == 0:
            await self.bot.say("I couldn't find that gym")
            return None, None
        hit = response[0]
        monacle_gym = await self.get_monacle_gym(hit)
        return hit, monacle_gym
crawler.py 文件源码 项目:source-code-to-name 作者: zironycho 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def exists_repos_in_database(self, github_id):
        if 0 != elasticsearch_dsl \
                .Search(using=self._es, index=self._es_index, doc_type=self._language) \
                .query('term', repo__github_id=github_id) \
                .count():
            return True
        return False
crawler.py 文件源码 项目:source-code-to-name 作者: zironycho 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def num_repos(self):
        if self._es.indices.exists(index=self._es_index):
            s = elasticsearch_dsl.Search(using=self._es, index=self._es_index, doc_type=self._language)
            s.aggs.bucket('num_repos', A('cardinality', field='repo.github_id'))
            response = s.execute()
            return response.aggregations.num_repos.value
        return 0
crawler.py 文件源码 项目:source-code-to-name 作者: zironycho 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_features(self):
        if self._es.indices.exists(index=self._es_index):
            s = elasticsearch_dsl.Search(using=self._es, index=self._es_index, doc_type=self._language)
            response = s.execute()
            if 0 != len(response.hits):
                return response.hits
        return False
tweetset_loader.py 文件源码 项目:TweetSets 作者: justinlittman 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def update_dataset_stats(dataset):
    search = Search(index=get_tweets_index_name(dataset.meta.id))
    search = search.query('term', dataset_id=dataset.meta.id)[0:0]
    search.aggs.metric('created_at_min', 'min', field='created_at')
    search.aggs.metric('created_at_max', 'max', field='created_at')
    search_response = search.execute()
    dataset.first_tweet_created_at = datetime.utcfromtimestamp(
        search_response.aggregations.created_at_min.value / 1000.0)
    dataset.last_tweet_created_at = datetime.utcfromtimestamp(
        search_response.aggregations.created_at_max.value / 1000.0)
    dataset.tweet_count = search_response.hits.total
    dataset.save()


问题


面经


文章

微信
公众号

扫码关注公众号