def do_index_command(self, index, **options):
"""Rebuild search index."""
if options['interactive']:
logger.warning("This will permanently delete the index '%s'.", index)
if not self._confirm_action():
logger.warning("Aborting rebuild of index '%s' at user's request.", index)
return
try:
delete = delete_index(index)
except TransportError:
delete = {}
logger.info("Index %s does not exist, cannot be deleted.", index)
create = create_index(index)
update = update_index(index)
return {
'delete': delete,
'create': create,
'update': update
}
python类TransportError()的实例源码
def translate_es_errors(func):
"""Catches all Elasticsearch errors and raises an instance of SearchError."""
def wrapper(*a, **k):
try:
return func(*a, **k)
except exceptions.TransportError as e:
print(str(e))
raise SearchError("Elasticsearch refused connection: " + str(e))
return wrapper
def translate_es_errors(func):
"""Catches all Elasticsearch errors and raises an instance of SearchError."""
def wrapper(*a, **k):
try:
return func(*a, **k)
except exceptions.TransportError as e:
print(str(e))
raise SearchError("Elasticsearch refused connection: " + str(e))
return wrapper
def test_handle(self, mock_do, mock_log):
"""Test the main handle method calls do_index_command."""
obj = BaseSearchCommand()
obj.handle(indexes=['foo', 'bar'])
# this should have called the do_index_command twice
mock_do.assert_has_calls([mock.call('foo'), mock.call('bar')])
mock_do.reset_mock()
mock_do.side_effect = TransportError(123, "oops", {'error': {'reason': 'no idea'}})
obj.handle(indexes=['baz'])
mock_do.assert_called_once_with('baz')
mock_log.warning.assert_called_once()
def test_rebuild_search_index(self, mock_update, mock_create, mock_delete):
"""Test the rebuild_search_index command."""
cmd = rebuild_search_index.Command()
result = cmd.do_index_command('foo', interactive=False) # True would hang the tests
mock_delete.assert_called_once_with('foo')
mock_create.assert_called_once_with('foo')
mock_update.assert_called_once_with('foo')
self.assertEqual(result['delete'], mock_delete.return_value)
self.assertEqual(result['create'], mock_create.return_value)
self.assertEqual(result['update'], mock_update.return_value)
# check that the delete is handled if the index does not exist
mock_delete.side_effect = TransportError("Index not found")
result = cmd.do_index_command('foo', interactive=False) # True would hang the tests
self.assertEqual(result['delete'], {})
def handle(self, *args, **options):
"""Run do_index_command on each specified index and log the output."""
for index in options.pop('indexes'):
data = {}
try:
data = self.do_index_command(index, **options)
except TransportError as ex:
logger.warning("ElasticSearch threw an error: %s", ex)
data = {
"index": index,
"status": ex.status_code,
"reason": ex.error,
}
finally:
logger.info(data)
def _perform_request(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
with pin.tracer.trace("elasticsearch.query") as span:
# Don't instrument if the trace is not sampled
if not span.sampled:
return func(*args, **kwargs)
method, url = args
params = kwargs.get('params')
body = kwargs.get('body')
span.service = pin.service
span.span_type = SPAN_TYPE
span.set_tag(metadata.METHOD, method)
span.set_tag(metadata.URL, url)
span.set_tag(metadata.PARAMS, urlencode(params))
if method == "GET":
span.set_tag(metadata.BODY, instance.serializer.dumps(body))
status = None
span = quantize(span)
try:
result = func(*args, **kwargs)
except TransportError as e:
span.set_tag(http.STATUS_CODE, getattr(e, 'status_code', 500))
raise
try:
# Optional metadata extraction with soft fail.
if isinstance(result, tuple) and len(result) == 2:
# elasticsearch<2.4; it returns both the status and the body
status, data = result
else:
# elasticsearch>=2.4; internal change for ``Transport.perform_request``
# that just returns the body
data = result
took = data.get("took")
if took:
span.set_metric(metadata.TOOK, int(took))
except Exception:
pass
if status:
span.set_tag(http.STATUS_CODE, status)
return result
def get_traced_transport(datadog_tracer, datadog_service=DEFAULT_SERVICE):
datadog_tracer.set_service_info(
service=datadog_service,
app=SPAN_TYPE,
app_type=AppTypes.db,
)
class TracedTransport(Transport):
""" Extend elasticseach transport layer to allow Datadog
tracer to catch any performed request.
"""
_datadog_tracer = datadog_tracer
_datadog_service = datadog_service
def perform_request(self, method, url, params=None, body=None):
with self._datadog_tracer.trace("elasticsearch.query") as s:
# Don't instrument if the trace is not sampled
if not s.sampled:
return super(TracedTransport, self).perform_request(
method, url, params=params, body=body)
s.service = self._datadog_service
s.span_type = SPAN_TYPE
s.set_tag(metadata.METHOD, method)
s.set_tag(metadata.URL, url)
s.set_tag(metadata.PARAMS, urlencode(params))
if method == "GET":
s.set_tag(metadata.BODY, self.serializer.dumps(body))
s = quantize(s)
try:
result = super(TracedTransport, self).perform_request(method, url, params=params, body=body)
except TransportError as e:
s.set_tag(http.STATUS_CODE, e.status_code)
raise
status = None
if isinstance(result, tuple) and len(result) == 2:
# elasticsearch<2.4; it returns both the status and the body
status, data = result
else:
# elasticsearch>=2.4; internal change for ``Transport.perform_request``
# that just returns the body
data = result
if status:
s.set_tag(http.STATUS_CODE, status)
took = data.get("took")
if took:
s.set_metric(metadata.TOOK, int(took))
return result
return TracedTransport
def update(self, index, iterable, commit=True):
if not self.setup_complete:
try:
self.setup()
except TransportError as e:
if not self.silently_fail:
raise
self.log.error(u"Failed to add documents to Elasticsearch: %s", e, exc_info=True)
return
prepped_docs = []
for obj in iterable:
try:
prepped_data = index.full_prepare(obj)
# removing 'id' item from data
# Convert the data to make sure it's happy.
final_data = {
ELASTICSEARCH_ID if key == ID else key: self._from_python(value)
for key, value in prepped_data.items()
}
# end removing 'id' item from data
prepped_docs.append(final_data)
except SkipDocument:
self.log.debug(u"Indexing for object `%s` skipped", obj)
except TransportError as e:
if not self.silently_fail:
raise
# We'll log the object identifier but won't include the actual object
# to avoid the possibility of that generating encoding errors while
# processing the log message:
self.log.error(u"%s while preparing object for update" % e.__class__.__name__, exc_info=True,
extra={"data": {"index": index,
"object": get_identifier(obj)}})
bulk(self.conn, prepped_docs, index=self.index_name, doc_type='modelresult')
if commit:
self.conn.indices.refresh(index=self.index_name)
def post(json_request_body: dict,
replica: str,
per_page: int,
output_format: str,
_scroll_id: typing.Optional[str] = None) -> dict:
es_query = json_request_body['es_query']
per_page = PerPageBounds.check(per_page)
replica_enum = Replica[replica] if replica is not None else Replica.aws
get_logger().debug("Received posted query. Replica: %s Query: %s Per_page: %i Timeout: %s Scroll_id: %s",
replica_enum.name, json.dumps(es_query, indent=4), per_page, _scroll_id)
# TODO: (tsmith12) determine if a search operation timeout limit is needed
# TODO: (tsmith12) allow users to retrieve previous search results
# TODO: (tsmith12) if page returns 0 hits, then all results have been found. delete search id
try:
page = _es_search_page(es_query, replica_enum, per_page, _scroll_id, output_format)
request_dict = _format_request_body(page, es_query, replica_enum, output_format)
request_body = jsonify(request_dict)
if len(request_dict['results']) < per_page:
response = make_response(request_body, requests.codes.ok)
else:
response = make_response(request_body, requests.codes.partial)
next_url = _build_scroll_url(page['_scroll_id'], per_page, replica_enum, output_format)
response.headers['Link'] = _build_link_header({next_url: {"rel": "next"}})
return response
except TransportError as ex:
if ex.status_code == requests.codes.bad_request:
get_logger().debug("%s", f"Invalid Query Recieved. Exception: {ex}")
raise DSSException(requests.codes.bad_request,
"elasticsearch_bad_request",
f"Invalid Elasticsearch query was received: {str(ex)}")
elif ex.status_code == requests.codes.not_found:
get_logger().debug("%s", f"Search Context Error. Exception: {ex}")
raise DSSException(requests.codes.not_found,
"elasticsearch_context_not_found",
"Elasticsearch context has returned all results or timeout has expired.")
elif ex.status_code == 'N/A':
get_logger().error("%s", f"Elasticsearch Invalid Endpoint. Exception: {ex}")
raise DSSException(requests.codes.service_unavailable,
"service_unavailable",
"Elasticsearch reached an invalid endpoint. Try again later.")
else:
get_logger().error("%s", f"Elasticsearch Internal Server Error. Exception: {ex}")
raise DSSException(requests.codes.internal_server_error,
"internal_server_error",
"Elasticsearch Internal Server Error")
except ElasticsearchException as ex:
get_logger().error("%s", f"Elasticsearch Internal Server Error. Exception: {ex}")
raise DSSException(requests.codes.internal_server_error,
"internal_server_error",
"Elasticsearch Internal Server Error")