def query_streets(name=None, locality=None, department=None, state=None,
road=None, max=None, fields=[]):
"""Busca calles según parámetros de búsqueda de una consulta.
Args:
name (str): Nombre de la calle para filtrar (opcional).
locality (str): Nombre de la localidad para filtrar (opcional).
department (str): Nombre de departamento para filtrar (opcional).
state (str): ID o nombre de provincia para filtrar (opcional).
road_type (str): Nombre del tipo de camino para filtrar (opcional).
max (int): Limita la cantidad de resultados (opcional).
fields (list): Campos a devolver en los resultados (opcional).
Returns:
list: Resultados de búsqueda de calles.
"""
index = 'calles-*' # Search in all indexes by default.
terms = []
if name:
condition = build_condition(NAME, get_abbreviated(name), fuzzy=True)
terms.append(condition)
if road:
condition = build_condition(ROAD_TYPE, road, fuzzy=True)
terms.append(condition)
if locality:
condition = build_condition(LOCALITY, locality, fuzzy=True)
terms.append(condition)
if department:
condition = build_condition(DEPT, department, fuzzy=True)
terms.append(condition)
if state:
target_state = query_entity(STATES, state, max=1)
if target_state: # Narrows search to specific index.
index = '-'.join([STREETS, target_state[0][ID]])
if LOCATION in fields:
fields.extend([GEOM, START_R, START_L, END_R, END_L, FULL_NAME])
query = {'query': {'bool': {'must': terms}} if terms else {"match_all": {}},
'size': max or 10, '_source': fields}
try:
result = Elasticsearch().search(index=index, body=query)
except ElasticsearchException as error:
return []
return [parse_es(hit) for hit in result['hits']['hits']]
评论列表
文章目录