def tag_by_phrase(self, tag_query, source):
print('query=%s source=%s' % (json.dumps(tag_query), source))
resp = self.es.search(index='fcc-comments', body=tag_query, size=0)
total = resp['hits']['total']
print('tagging %s / %s matches' % (self.limit, total))
docs = []
for doc in scan(self.es, index='fcc-comments', query=tag_query, size=1000):
docs.append(lib.bulk_update_doc(doc['_id'], {'source': source}))
if not len(docs) % 1000:
print('\tfetched %s\n%s\t%s' % (len(docs), doc['_id'], doc['_source']['text_data'][:400]))
if len(docs) >= self.limit:
break
print('indexing %s' % (len(docs)))
tagged = lib.bulk_update(self.es, docs)
print('tagged %s / %s matches' % (tagged, total))
return tagged
python类scan()的实例源码
def preview(self, fraction=0.1):
fetched = 0
scores = []
mod_print = int(1 / fraction)
while fetched < self.limit:
'''
use search instead of scan because keeping an ordered scan cursor
open negates the performance benefits
'''
resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit)
print('total=%s mod_print=%s' % (resp['hits']['total'], mod_print))
for doc in resp['hits']['hits']:
fetched += 1
scores.append(doc['_score'])
if not fetched % mod_print:
print('\n--- comment %s\t%s\t%s\t%s' % (fetched, doc['_id'],
doc['_score'], doc['_source']['text_data'][:1000]))
def delete_dataset(dataset_name):
"""Delete all entries from a particular dataset."""
q = {'query': {'term': {'dataset': dataset_name}}, '_source': False}
def deletes():
for i, res in enumerate(scan(es, query=q, index=es_index)):
yield {
'_op_type': 'delete',
'_index': str(es_index),
'_type': res.get('_type'),
'_id': res.get('_id')
}
if i > 0 and i % 10000 == 0:
log.info("Delete %s: %s", dataset_name, i)
es.indices.refresh(index=es_index)
bulk(es, deletes(), stats_only=True, chunk_size=DATA_PAGE)
optimize_search()
def _scan_fingerprints(dataset_name=None):
if dataset_name:
q = {'term': {'dataset': dataset_name}}
else:
q = {'match_all': {}}
q = {
'query': q,
'_source': ['fingerprints', 'dataset']
}
scan_iter = scan(es, query=q, index=es_index, doc_type=Schema.ENTITY)
for i, doc in enumerate(scan_iter):
source = doc.get('_source')
fps = source.get('fingerprints')
if fps is None:
continue
for fp in fps:
if fp is None:
continue
yield fp, source.get('dataset')
if i != 0 and i % 10000 == 0:
log.info("Crossref: %s entities...", i)
def find_matching_subscriptions(self, index_name: str) -> set:
percolate_document = {
'query': {
'percolate': {
'field': "query",
'document_type': ESDocType.doc.name,
'document': self
}
}
}
subscription_ids = set()
for hit in scan(ElasticsearchClient.get(self.logger),
index=index_name,
query=percolate_document):
subscription_ids.add(hit["_id"])
self.logger.debug("Found matching subscription count: %i", len(subscription_ids))
return subscription_ids
def scan_index(index, model):
"""
Yield all documents of model type in an index.
This function calls the elasticsearch.helpers.scan function,
and yields all the documents in the index that match the doc_type
produced by a specific Django model.
Args:
index: string, the name of the index to scan, must be a configured
index as returned from settings.get_index_names.
model: a Django model type, used to filter the the documents that
are scanned.
Yields each document of type model in index, one at a time.
"""
# see https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-type-query.html
query = {"query": {"type": {"value": model._meta.model_name}}}
client = get_client()
for hit in helpers.scan(client, index=index, query=query):
yield hit
def get_rev_links(self, model, rel, *item_types):
filter_ = {'term': {'links.' + rel: str(model.uuid)}}
if item_types:
filter_ = [
filter_,
{'terms': {'item_type': item_types}},
]
query = {
'stored_fields': [],
'query': {
'bool': {
'filter': filter_,
}
}
}
return [
hit['_id'] for hit in scan(self.es, query=query)
]
def run(self):
'''
get documents without a sentiment tag that match significant terms:
- significant terms from postive regex tagged vs others
- extra multi match clause for stronger terms (in multiple term sets:
positive vs negative, untagged, and all
- phrase match net neutrality since both terms score high
'''
index_queue = multiprocessing.Queue()
bulk_index_process = multiprocessing.Process(
target=self.bulk_index, args=(index_queue,),
)
bulk_index_process.start()
fetched = 0
try:
while fetched < self.limit:
'''
use search instead of scan because keeping an ordered scan cursor
open negates the performance benefits
'''
resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit)
for doc in resp['hits']['hits']:
index_queue.put(doc['_id'])
fetched += 1
if not fetched % 100:
print('%s\t%s\t%s' % (fetched, doc['_score'],
doc['_source']['text_data']))
except ConnectionTimeout:
print('error fetching: connection timeout')
index_queue.put(None)
bulk_index_process.join()
def scan(self, index, query, limit=None, id_only=False):
size = self.bulk_size
max_records = None
cnt = 0
if isinstance(limit, int):
if limit > 0:
size = min(limit, size)
max_records = limit
kw = dict(
index=index,
query=query,
size=size
)
if id_only:
kw['_source'] = ['_id']
log.debug('Scanning for %s (size = %d, index = %s)',
query, size, index)
for hit in helpers.scan(self.client, **kw):
if max_records:
if cnt >= max_records:
log.debug('Stopping after pulling %d records'
' as requested', cnt)
raise StopIteration
log.debug('Yielding %s', hit['_id'])
cnt += 1
if id_only:
yield hit.get('_id')
else:
yield hit
def get_hits(self, start_time, stop_time):
#print "in get hits"
time_filter_query={ "query":{"bool": { "filter": {"range": {"@timestamp": {
"gte": start_time,
"lte": stop_time,
"format":"epoch_millis"
}}}}}}
scan_generator=helpers.scan(self.es, query=time_filter_query, index=self.es_index,)
newHits=[]
for item in scan_generator:
utc_record=item['_source']['@timestamp'][:-1]
new_value=calendar.timegm(datetime.datetime.strptime(utc_record, "%Y-%m-%dT%H:%M:%S.%f").timetuple())*1000
item['@timestamp']=new_value
newHits.append(item)
return newHits
def doc_feeder(self, index_type=None, index_name=None, step=10000,
verbose=True, query=None, scroll='10m', **kwargs):
conn = self.conn
index_name = index_name or self.ES_INDEX_NAME
doc_type = index_type or self.ES_INDEX_TYPE
n = self.count(query=query)['count']
cnt = 0
t0 = time.time()
if verbose:
print('\ttotal docs: {}'.format(n))
_kwargs = kwargs.copy()
_kwargs.update(dict(size=step, index=index_name, doc_type=doc_type))
res = helpers.scan(conn, query=query, scroll=scroll, **_kwargs)
t1 = time.time()
for doc in res:
if verbose and cnt % step == 0:
if cnt != 0:
print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1)))
print('\t{}-{}...'.format(cnt+1, min(cnt+step, n)), end='')
t1 = time.time()
yield doc
cnt += 1
if verbose:
print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1)))
print("Finished! [{}]".format(timesofar(t0)))
def get_venues_by_location(location, radius, hour_for_cache):
"""Return a list of venues oid that are in location ('latitude,longitude')
in the given radius. Cache results for one hour.
"""
lat_lon = location.split(',')
body = {
"query": {
"filtered": {
"query": {"match_all": {}},
"filter": {
"geo_distance": {
"distance": str(radius) + 'km',
"location": {
"lat": float(lat_lon[0]),
"lon": float(lat_lon[1])
}
}
}
}
},
"_source": {
"include": ["oid"]
}
}
try:
result = scan(
es,
index='lac',
doc_type='geo_location',
query=body,
size=500)
return [v['_source']['oid'] for v in result]
except Exception as e:
log.exception(e)
return []
def scan(self, **kwargs):
es = get_elasticsearch(self)
return scan(es, index=es._index, doc_type=self.doc_type, **kwargs)
def get_scroll(query_dsl, es_client, index_name=None, keep_alive='1m'):
"""Returns an iterator for results matching query_dsl."""
if index_name is None:
index_name = es_client.index_name
scroll = scan(
es_client, query=query_dsl, scroll=keep_alive, index=index_name)
return scroll
def get_tags(self, repos, fromdate=None, todate=None):
filter = {
"bool": {
"must": [],
"should": [],
}
}
for repo in repos:
should_repo_clause = {
"bool": {
"must": []
}
}
should_repo_clause["bool"]["must"].append(
{"term": {"repo": repo}}
)
filter["bool"]["should"].append(should_repo_clause)
body = {
"filter": filter
}
body["filter"]["bool"]["must"].append(
{
"range": {
"date": {
"gte": fromdate,
"lt": todate,
}
}
}
)
return [t for t in scanner(self.es, query=body,
index=self.index, doc_type=self.dbname)]
def scan_all(self,
scroll = '5m', #TODO - hard coded timeout.
):
"""
Most efficient way to scan all documents.
"""
rr = es_scan(client = self.es,
index = self.index_name,
doc_type = self.doc_type,
scroll = scroll,
query = {"query": {'match_all': {}}},
)
return rr
def typeahead_generate():
"""
Re-generate typeahead search. This consists of a weighted set of completions for every possible query.
Weighing ideas:
- query frequency.
- query results quality / count.
- language model.
TODO: Consider having the `NearestNeighborsBase` storage create this incrementally?
Is that approach really better in a clustered setup?
"""
assert False,'WIP'
if mc_config.LOW_LEVEL:
es = mc_neighbors.low_level_es_connect()
res = scan(client = es,
index = index_name,
doc_type = doc_type,
scroll = '5m', #TODO - hard coded.
query = {"query": {'match_all': {}
},
#'from':0,
#'size':1,
},
)
else:
nes = mc_neighbors.high_level_connect(index_name = index_name,
doc_type = doc_type,
)
res = nes.scan_all()
def list_designs(self):
"""
Return a list designs in corpus
:return: a list of all design ids
"""
result = {}
s = scan(self.es, index=self.index_name)
for r in s:
result.update({r['_source']['stl_id']: True})
return result.keys()
elasticsearch5.py 文件源码
项目:django-haystack-elasticsearch
作者: CraveFood
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def clear(self, models=None, commit=True):
"""
Clears the backend of all documents/objects for a collection of models.
:param models: List or tuple of models to clear.
:param commit: Not used.
"""
if models is not None:
assert isinstance(models, (list, tuple))
try:
if models is None:
self.conn.indices.delete(index=self.index_name, ignore=404)
self.setup_complete = False
self.existing_mapping = {}
self.content_field_name = None
else:
models_to_delete = []
for model in models:
models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model)))
# Delete using scroll API
query = {'query': {'query_string': {'query': " OR ".join(models_to_delete)}}}
generator = scan(self.conn, query=query, index=self.index_name, doc_type='modelresult')
actions = ({
'_op_type': 'delete',
'_id': doc['_id'],
} for doc in generator)
bulk(self.conn, actions=actions, index=self.index_name, doc_type='modelresult')
self.conn.indices.refresh(index=self.index_name)
except elasticsearch.TransportError as e:
if not self.silently_fail:
raise
if models is not None:
self.log.error("Failed to clear Elasticsearch index of models '%s': %s",
','.join(models_to_delete), e, exc_info=True)
else:
self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
elasticsearch2.py 文件源码
项目:django-haystack-elasticsearch
作者: CraveFood
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def clear(self, models=None, commit=True):
"""
Clears the backend of all documents/objects for a collection of models.
:param models: List or tuple of models to clear.
:param commit: Not used.
"""
if models is not None:
assert isinstance(models, (list, tuple))
try:
if models is None:
self.conn.indices.delete(index=self.index_name, ignore=404)
self.setup_complete = False
self.existing_mapping = {}
self.content_field_name = None
else:
models_to_delete = []
for model in models:
models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model)))
# Delete using scroll API
query = {'query': {'query_string': {'query': " OR ".join(models_to_delete)}}}
generator = scan(self.conn, query=query, index=self.index_name, doc_type='modelresult')
actions = ({
'_op_type': 'delete',
'_id': doc['_id'],
} for doc in generator)
bulk(self.conn, actions=actions, index=self.index_name, doc_type='modelresult')
self.conn.indices.refresh(index=self.index_name)
except elasticsearch.TransportError as e:
if not self.silently_fail:
raise
if models is not None:
self.log.error("Failed to clear Elasticsearch index of models '%s': %s",
','.join(models_to_delete), e, exc_info=True)
else:
self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
def _refresh_percolate_queries(self, index_name: str) -> None:
# When dynamic templates are used and queries for percolation have been added
# to an index before the index contains mappings of fields referenced by those queries,
# the queries must be reloaded when the mappings are present for the queries to match.
# See: https://github.com/elastic/elasticsearch/issues/5750
subscription_index_name = Config.get_es_index_name(ESIndexType.subscriptions, self.replica)
es_client = ElasticsearchClient.get(self.logger)
if not es_client.indices.exists(subscription_index_name):
return
subscription_queries = [{'_index': index_name,
'_type': ESDocType.query.name,
'_id': hit['_id'],
'_source': hit['_source']['es_query']
}
for hit in scan(es_client,
index=subscription_index_name,
doc_type=ESDocType.subscription.name,
query={'query': {'match_all': {}}})
]
if subscription_queries:
try:
bulk(es_client, iter(subscription_queries), refresh=True)
except BulkIndexError as ex:
self.logger.error("Error occurred when adding subscription queries to index %s Errors: %s",
index_name, ex.errors)
def _remove(self):
bulk_deletes = []
for result in scan(self.elastic_conn,
index="test",
doc_type="test"):
result['_op_type'] = 'delete'
bulk_deletes.append(result)
bulk(self.elastic_conn, bulk_deletes)
elastic2_doc_manager.py 文件源码
项目:elastic2-doc-manager
作者: mongodb-labs
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def handle_command(self, doc, namespace, timestamp):
# Flush buffer before handle command
self.commit()
db = namespace.split('.', 1)[0]
if doc.get('dropDatabase'):
dbs = self.command_helper.map_db(db)
for _db in dbs:
self.elastic.indices.delete(index=_db.lower())
if doc.get('renameCollection'):
raise errors.OperationFailed(
"elastic_doc_manager does not support renaming a mapping.")
if doc.get('create'):
db, coll = self.command_helper.map_collection(db, doc['create'])
if db and coll:
self.elastic.indices.put_mapping(
index=db.lower(), doc_type=coll,
body={
"_source": {"enabled": True}
})
if doc.get('drop'):
db, coll = self.command_helper.map_collection(db, doc['drop'])
if db and coll:
# This will delete the items in coll, but not get rid of the
# mapping.
warnings.warn("Deleting all documents of type %s on index %s."
"The mapping definition will persist and must be"
"removed manually." % (coll, db))
responses = streaming_bulk(
self.elastic,
(dict(result, _op_type='delete') for result in scan(
self.elastic, index=db.lower(), doc_type=coll)))
for ok, resp in responses:
if not ok:
LOG.error(
"Error occurred while deleting ElasticSearch docum"
"ent during handling of 'drop' command: %r" % resp)
elastic2_doc_manager.py 文件源码
项目:elastic2-doc-manager
作者: mongodb-labs
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _stream_search(self, *args, **kwargs):
"""Helper method for iterating over ES search results."""
for hit in scan(self.elastic, query=kwargs.pop('body', None),
scroll='10m', **kwargs):
hit['_source']['_id'] = hit['_id']
yield hit['_source']
def scan(index=None, doc_type=None, **kwargs):
es = get_client(env.elasticsearch_alias)
docs = helpers.scan(es, index=index, doc_type=doc_type, ignore=IGNORE, **kwargs)
for doc in docs:
jsonprint(doc)
def get_all_urls(self, limit=None):
'''get all urls in the index
'''
res = scan(
self.conn, index=self.index, doc_type=RECORD_TYPE,
_source_include=[],
query={'query': {'match_all': {}}})
for item in islice(res, limit):
yield item['_id']
def get_children(self, node):
'''get child URLs of `url`
'''
assert node.replica is not None
res = scan(
self.conn, index=self.index, doc_type=UNION_FIND_TYPE,
_source_include=[],
query={'query': {'term': {'parent': node.get_id()}}})
for item in res:
yield AKANode.from_record(item['_source']['child'])
def get_all_unions(self):
'''
'''
res = scan(
self.conn, index=self.index, doc_type=UNION_FIND_TYPE)
for item in res:
yield item['_source']
def _stream_search(self, *args, **kwargs):
"""Helper method for iterating over ES search results."""
for hit in scan(self.elastic, query=kwargs.pop('body', None),
scroll='10m', **kwargs):
hit['_source']['_id'] = hit['_id']
yield hit['_source']
def get_all_plugins(self):
index = ElasticConfig.INDEX_VULTASKS["index"]
doc_type = ElasticConfig.INDEX_VULTASKS["type"]
scroll = "2m"
size = 30
body = {"query" : {"match_all": {}}}
data = helpers.scan(client=self.es, query=body, index=index, doc_type=doc_type, scroll=scroll, size=size)
return data
#search assets