def test_es():
"""
Before running other tests, ensure connection to ES is established
"""
es = Elasticsearch()
try:
es.indices.create(INDEX)
es.indices.delete(INDEX)
return True
except RequestError:
print('Index already exists: skipping tests.')
return False
except ConnectionError:
print('The ElasticSearch backend is not running: skipping tests.')
return False
except Exception as e:
print('An unknown error occured connecting to ElasticSearch: %s' % e)
return False
python类ConnectionError()的实例源码
def get_test_client(nowait=False, **kwargs):
# construct kwargs from the environment
kw = {'timeout': 30}
if 'TEST_ES_CONNECTION' in os.environ:
from elasticsearch import connection
kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
kw.update(kwargs)
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
# wait for yellow status
for _ in range(1 if nowait else 100):
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
time.sleep(.1)
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def get_client():
global client
if client is not None:
return client
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], timeout=300)
# wait for yellow status
for _ in range(100):
time.sleep(.1)
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
continue
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def setup():
log = logging.getLogger('haystack')
try:
import elasticsearch
if not ((5, 0, 0) <= elasticsearch.__version__ < (6, 0, 0)):
raise ImportError
from elasticsearch import Elasticsearch, exceptions
except ImportError:
log.error("'elasticsearch>=5.0.0,<6.0.0' not installed.", exc_info=True)
raise unittest.SkipTest("'elasticsearch>=5.0.0,<6.0.0' not installed.")
url = settings.HAYSTACK_CONNECTIONS['default']['URL']
es = Elasticsearch(url)
try:
es.info()
except exceptions.ConnectionError as e:
log.error("elasticsearch not running on %r" % url, exc_info=True)
raise unittest.SkipTest("elasticsearch not running on %r" % url, e)
def setup():
log = logging.getLogger('haystack')
try:
import elasticsearch
if not ((2, 0, 0) <= elasticsearch.__version__ < (3, 0, 0)):
raise ImportError
from elasticsearch import Elasticsearch, exceptions
except ImportError:
log.error("'elasticsearch>=2.0.0,<3.0.0' not installed.", exc_info=True)
raise unittest.SkipTest("'elasticsearch>=2.0.0,<3.0.0' not installed.")
url = settings.HAYSTACK_CONNECTIONS['default']['URL']
es = Elasticsearch(url)
try:
es.info()
except exceptions.ConnectionError as e:
log.error("elasticsearch not running on %r" % url, exc_info=True)
raise unittest.SkipTest("elasticsearch not running on %r" % url, e)
def update_template(es, max_retry, template_path, template_name):
with open(template_path) as f:
body = f.read()
for i in range(max_retry, 0, -1):
try:
es.indices.put_template(name=template_name, body=body)
log.info("Updating template {!r} done".format(template_name))
return
except (ConnectionError, NotFoundError):
log.warning(
"Updating template {!r} failed. Waiting for {} sec".format(
template_name, i))
time.sleep(i)
log.error("Updating template {!r} definitely failed".format(template_name))
def check_elastic_status(function):
def wrap(request, *args, **kwargs):
# controllo lo stato della connessione a ElastiSearch
try:
es = Elasticsearch()
es.info()
return function(request, *args, **kwargs)
except es_exceptions.ConnectionError as ce:
return HttpResponseRedirect('/elastic-connection-error')
except Exception as generic_exp:
print str(generic_exp)
return HttpResponseRedirect('/elastic-connection-error')
wrap.__doc__ = function.__doc__
wrap.__name__ = function.__name__
return wrap
def get_test_client(nowait=False):
# construct kwargs from the environment
kw = {}
if 'TEST_ES_CONNECTION' in os.environ:
from elasticsearch import connection
kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
# wait for yellow status
for _ in range(1 if nowait else 100):
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
time.sleep(.1)
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def get_test_client(nowait=False, **kwargs):
# construct kwargs from the environment
kw = {'timeout': 30}
if 'TEST_ES_CONNECTION' in os.environ:
from elasticsearch import connection
kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
kw.update(kwargs)
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
# wait for yellow status
for _ in range(1 if nowait else 100):
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
time.sleep(.1)
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None):
url_path = url
if params:
url_path = '%s?%s' % (url, urlencode(params or {}))
url = self.base_url + url_path
start = self.loop.time()
response = None
try:
with aiohttp.Timeout(timeout or self.timeout, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
raw_data = yield from response.text()
duration = self.loop.time() - start
except Exception as e:
self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e)
if isinstance(e, ServerFingerprintMismatch):
raise SSLError('N/A', str(e), e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)
finally:
if response is not None:
yield from response.release()
# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data)
self._raise_error(response.status, raw_data)
self.log_request_success(method, url, url_path, body, response.status, raw_data, duration)
return response.status, response.headers, raw_data
def test_update_non_silent(self):
with self.assertRaises(ConnectionError):
self.bad_sb.update(self.smmi, self.sample_objs)
# TODO mb test logs
def test_remove_non_silent(self):
with self.assertRaises(ConnectionError):
self.bad_sb.remove('test_app.mockmodel.1')
# TODO mb test logs
def test_clear_non_silent(self):
with self.assertRaises(ConnectionError):
self.bad_sb.clear()
# TODO mb test logs
def test_search_non_silent(self):
with self.assertRaises(ConnectionError):
self.bad_sb.search('foo')
def update_nr_replicas(es, max_retry, nr_replicas, index):
for i in range(max_retry, 0, -1):
try:
es.indices.put_settings(
body={"index": {"number_of_replicas": int(nr_replicas)}},
index=index)
log.info("Updating replicas done")
return
except (ConnectionError, NotFoundError):
log.warning(
"Updating replicas failed. Waiting for {} sec".format(i))
time.sleep(i)
log.error("Updating replicas definitely failed")
def perform_request(
self, method, url, params=None, body=None,
timeout=None, ignore=()):
url = self.url_prefix + url
if params:
url = '{}?{}'.format(url, urlencode(params))
full_url = self.host + url
start = self._loop.time()
try:
with async_timeout.timeout(timeout, loop=self._loop):
async with self._session.request(
method, full_url, data=body) as resp:
raw_data = await resp.text()
except Exception as e:
self.log_request_fail(
method, full_url, url, body,
self._loop.time() - start, exception=e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)
duration = self._loop.time() - start
# raise errors based on http status codes,
# let the client handle those if needed
if not (200 <= resp.status < 300) and resp.status not in ignore:
self.log_request_fail(
method, full_url, url, body, duration, resp.status, raw_data)
self._raise_error(resp.status, raw_data)
self.log_request_success(
method, full_url, url, body, resp.status, raw_data, duration)
return resp.status, resp.headers, raw_data
def _get_es_data(self, sites, telescopes):
date_range_query = {
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
# Retrieve documents 1 hour back to capture the telescope state at the start.
"gte": (self.start - timedelta(hours=1)).strftime(ES_STRING_FORMATTER),
"lte": self.end.strftime(ES_STRING_FORMATTER),
"format": "yyyy-MM-dd HH:mm:ss"
}
}
},
{
"terms": {
"telescope": telescopes
}
},
{
"terms": {
"site": sites
}
}
]
}
}
}
event_data = []
query_size = 10000
try:
data = self.es.search(
index="telescope_events", body=date_range_query, size=query_size, scroll='1m', # noqa
_source=['timestamp', 'telescope', 'enclosure', 'site', 'type', 'reason'],
sort=['site', 'enclosure', 'telescope', 'timestamp']
)
except ConnectionError:
raise ElasticSearchException
event_data.extend(data['hits']['hits'])
total_events = data['hits']['total']
events_read = min(query_size, total_events)
scroll_id = data.get('_scroll_id', 0)
while events_read < total_events:
data = self.es.scroll(scroll_id=scroll_id, scroll='1m') # noqa
scroll_id = data.get('_scroll_id', 0)
event_data.extend(data['hits']['hits'])
events_read += len(data['hits']['hits'])
return event_data