def index_worker(self, queue, size=200):
actions = []
indexed = 0
while True:
item = queue.get()
if item is None:
break
id_submission, analysis = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': id_submission,
'doc': {'analysis': analysis},
}
actions.append(doc)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
python类ConnectionTimeout()的实例源码
def run(self):
'''
get documents without a sentiment tag that match significant terms:
- significant terms from postive regex tagged vs others
- extra multi match clause for stronger terms (in multiple term sets:
positive vs negative, untagged, and all
- phrase match net neutrality since both terms score high
'''
index_queue = multiprocessing.Queue()
bulk_index_process = multiprocessing.Process(
target=self.bulk_index, args=(index_queue,),
)
bulk_index_process.start()
fetched = 0
try:
while fetched < self.limit:
'''
use search instead of scan because keeping an ordered scan cursor
open negates the performance benefits
'''
resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit)
for doc in resp['hits']['hits']:
index_queue.put(doc['_id'])
fetched += 1
if not fetched % 100:
print('%s\t%s\t%s' % (fetched, doc['_score'],
doc['_source']['text_data']))
except ConnectionTimeout:
print('error fetching: connection timeout')
index_queue.put(None)
bulk_index_process.join()
def bulk_index(self, queue, size=20):
actions = []
indexed = 0
ids = set()
while True:
item = queue.get()
if item is None:
break
doc_id = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': doc_id,
'doc': {'analysis.sentiment_sig_terms_ordered': True},
}
actions.append(doc)
ids.add(doc_id)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
if not indexed % 200:
print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
ids = list(ids)
#print('%s\n%s' % (len(ids), ' '.join(ids))
def create(self, *args, **kwargs):
"""
The create method allows for documents passed to be elasticsearch handler
directly.
"""
# date = kwargs.get("date", None)
# parser = kwargs.get('parser', 'default')
# doctype = kwargs.get('type', 'default')
# document = kwargs.get('document', {})
#
# if date:
# # Index document into an index based on index_date field
# options = {
# "index": "dminer-%s-%s" % (parser, date),
# "doc_type": doctype,
# "body": document
# }
# else:
# options = {
# "index": "dminer-%s" % parser,
# "doc_type": doctype,
# "body": document
# }
while True:
try:
value = self.es.index(*args, **kwargs)
except ConnectionTimeout:
print "Connection Timeout"
continue
break
return value
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None):
url_path = url
if params:
url_path = '%s?%s' % (url, urlencode(params or {}))
url = self.base_url + url_path
start = self.loop.time()
response = None
try:
with aiohttp.Timeout(timeout or self.timeout, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
raw_data = yield from response.text()
duration = self.loop.time() - start
except Exception as e:
self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e)
if isinstance(e, ServerFingerprintMismatch):
raise SSLError('N/A', str(e), e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)
finally:
if response is not None:
yield from response.release()
# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data)
self._raise_error(response.status, raw_data)
self.log_request_success(method, url, url_path, body, response.status, raw_data, duration)
return response.status, response.headers, raw_data
def lookup_portal_item(esclient, index_name, item_id):
"""
Returns an array of the collections that this item belongs to, according to Portal's ES index.
:param esclient: Elastic search client object to use
:param index_name: Name of the index to search within
:param item_id: item ID to look for
:return: array of collection names that this belongs to. Blank array if it does not belong.
"""
parts = id_xplodr.match(item_id)
query = {
'query': {
'filtered': {
'filter': {
'term': {
'vidispine_id_str_ex': item_id
}
}
}
}
}
wait_time = 2
while True:
try:
result = esclient.search(index=index_name, doc_type='item', body=query)
break
except ReadTimeoutError as e:
logger.warning(str(e))
sleep(wait_time)
wait_time *= 2
except ConnectionTimeout as e:
logger.warning(str(e))
sleep(wait_time)
wait_time *= 2
hits = result['hits']['hits']
if len(hits) == 0: raise PortalItemNotFound(item_id)
# pprint(hits[0]['_source'])
if not 'f___collection_str' in hits[0]['_source']:
return None
return hits[0]['_source']['f___collection_str'] # this is an array
def collect(self):
try:
response = self.es_client.cluster.health(level=self.level, request_timeout=self.timeout)
metrics = cluster_health_parser.parse_response(response, self.metric_name_list)
except ConnectionTimeout:
logging.warn('Timeout while fetching %s (timeout %ss).', self.description, self.timeout)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
except Exception:
logging.exception('Error while fetching %s.', self.description)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
else:
yield from gauge_generator(metrics)
yield collector_up_gauge(self.metric_name_list, self.description)
def collect(self):
try:
response = self.es_client.nodes.stats(metric=self.metrics, request_timeout=self.timeout)
metrics = nodes_stats_parser.parse_response(response, self.metric_name_list)
except ConnectionTimeout:
logging.warn('Timeout while fetching %s (timeout %ss).', self.description, self.timeout)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
except Exception:
logging.exception('Error while fetching %s.', self.description)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
else:
yield from gauge_generator(metrics)
yield collector_up_gauge(self.metric_name_list, self.description)
def collect(self):
try:
response = self.es_client.indices.stats(metric=self.metrics, fields=self.fields, request_timeout=self.timeout)
metrics = indices_stats_parser.parse_response(response, self.parse_indices, self.metric_name_list)
except ConnectionTimeout:
logging.warn('Timeout while fetching %s (timeout %ss).', self.description, self.timeout)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
except Exception:
logging.exception('Error while fetching %s.', self.description)
yield collector_up_gauge(self.metric_name_list, self.description, succeeded=False)
else:
yield from gauge_generator(metrics)
yield collector_up_gauge(self.metric_name_list, self.description)
def test_failed_es_request(self):
request = mock_request()
exc = es_exceptions.ConnectionTimeout()
request.es.cluster.health.side_effect = exc
with pytest.raises(views.FailedHealthcheck) as e:
views.healthcheck(request)
assert e.value.__cause__ == exc
def exists_es_index(str_valid_index):
"""Returns if given index exists in Elasticsearch cluster"""
connection_attempts = 0
while connection_attempts < 3:
try:
es = get_es_object()
es_indices = es.indices
return es_indices.exists(index=str_valid_index)
except exceptions.ConnectionTimeout:
connection_attempts += 1
sys.exit('Elasticsearch connection timeout, exiting now...')
def perform_request(
self, method, url, params=None, body=None,
timeout=None, ignore=()):
url = self.url_prefix + url
if params:
url = '{}?{}'.format(url, urlencode(params))
full_url = self.host + url
start = self._loop.time()
try:
with async_timeout.timeout(timeout, loop=self._loop):
async with self._session.request(
method, full_url, data=body) as resp:
raw_data = await resp.text()
except Exception as e:
self.log_request_fail(
method, full_url, url, body,
self._loop.time() - start, exception=e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)
duration = self._loop.time() - start
# raise errors based on http status codes,
# let the client handle those if needed
if not (200 <= resp.status < 300) and resp.status not in ignore:
self.log_request_fail(
method, full_url, url, body, duration, resp.status, raw_data)
self._raise_error(resp.status, raw_data)
self.log_request_success(
method, full_url, url, body, resp.status, raw_data, duration)
return resp.status, resp.headers, raw_data