def get_folder(client, folder_id):
"""
:param client:
:param folder_id:
:return:
"""
folder = None
num_retry = 15
for x in range(num_retry):
try:
folder = client.folder(folder_id=folder_id).get()
break
except (ConnectionError, BrokenPipeError, ProtocolError, ConnectionResetError, BoxAPIException):
crate_logger.debug(traceback.format_exc())
if x >= num_retry - 1:
crate_logger.debug('Failed for the last time to get the folder: {}'.format(folder_id))
return folder
python类ProtocolError()的实例源码
def get_params(self, container_id: str) -> Optional[Dict[str, Any]]:
if self._config.cache_params and container_id in self._params_cache:
logger.debug("Returning cached params for container {0}".format(container_id))
return self._params_cache[container_id]
logger.debug("[{0}] Starting to fetch params for {1}".format(threading.current_thread().name, container_id))
try:
params = self._client.inspect_container(container_id)
except NotFound as e:
logger.warning("Container {0} not found - {1}.".format(container_id, e))
return None
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when fetching params for container {0}: {1}".format(container_id, e))
return {}
except Exception as e:
logger.error("Unexpected error when fetching params for container {0}: {1}".format(container_id, e))
return {}
logger.debug("[{0}] Params fetched for {1}".format(threading.current_thread().name, container_id))
if not self._config.cache_params:
return params
logger.debug("[{0}] Storing params of {1} in cache".format(threading.current_thread().name, container_id))
self._params_cache[container_id] = params
return params
def get_box_folder(client, cur_box_folder, folder_id, retry_limit):
"""
:param client:
:param cur_box_folder:
:param folder_id:
:param retry_limit:
:return:
"""
for i in range(retry_limit):
try:
box_folder = client.folder(folder_id=folder_id).get()
cur_box_folder = box_folder
break
except (ConnectionError, BrokenPipeError, ProtocolError, ConnectionResetError, BoxAPIException):
if i + 1 >= retry_limit:
crate_logger.warn('Attempt ({retry_count}) out of ({max_count}); Going to give '
'up on the write event because: {trace}'.format(retry_count=i,
max_count=retry_limit,
trace=traceback.format_exc()))
else:
crate_logger.warn('Attempt ({retry_count}) '
'out of ({max_count}): {trace}'.format(retry_count=i,
max_count=retry_limit,
trace=traceback.format_exc()))
return cur_box_folder
def _iter_stream(self):
"""Stream parser.
:returns: Next item in the stream (may or may not be 'delimited').
:raises: TwitterConnectionError, StopIteration
"""
while True:
item = None
buf = bytearray()
stall_timer = None
try:
while True:
# read bytes until item boundary reached
buf += self.stream.read(1)
if not buf:
# check for stall (i.e. no data for 90 seconds)
if not stall_timer:
stall_timer = time.time()
elif time.time() - stall_timer > STREAMING_TIMEOUT:
raise TwitterConnectionError('Twitter stream stalled')
elif stall_timer:
stall_timer = None
if buf[-2:] == b'\r\n':
item = buf[0:-2]
if item.isdigit():
# use byte size to read next item
nbytes = int(item)
item = None
item = self.stream.read(nbytes)
break
yield item
except (ConnectionError, ProtocolError, ReadTimeout, ReadTimeoutError,
SSLError, ssl.SSLError, socket.error) as e:
raise TwitterConnectionError(e)
except AttributeError:
# inform iterator to exit when client closes connection
raise StopIteration
def check_container(self, container_id: str, check_source: CheckSource, remove_from_cache: bool=False) \
-> Optional[Container]:
try:
if remove_from_cache:
self.remove_from_cache(container_id)
if not self._config.disable_params:
params = self.get_params(container_id)
else:
params = {}
if not self._config.disable_metrics:
logger.debug("[{0}] Starting to fetch metrics for {1}".format(threading.current_thread().name,
container_id))
metrics = self._client.stats(container=container_id, decode=True, stream=False)
else:
metrics = {}
logger.debug("[{0}] Fetched data for container {1}".format(threading.current_thread().name, container_id))
except NotFound as e:
logger.warning("Container {0} not found - {1}.".format(container_id, e))
return None
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when fetching info about container {0}: {1}".format(container_id, e))
return None
except Exception as e:
logger.error("Unexpected error when fetching info about container {0}: {1}".format(container_id, e))
return None
if params is None or metrics is None:
logger.warning("Params or metrics were not fetched for container {}. Not returning container."
.format(container_id))
return None
return Container(container_id, params, metrics, 0, check_source)
def check_containers(self, check_source: CheckSource) -> Iterable[Container]:
with self._padlock:
if self._check_in_progress:
logger.warning("[{0}] Previous check did not yet complete, consider increasing CHECK_INTERVAL_S"
.format(threading.current_thread().name))
return
self._check_in_progress = True
logger.debug("Periodic check start: connecting to get the list of containers")
self.last_check_containers_run_start_timestamp = datetime.datetime.utcnow()
try:
containers = self._client.containers(quiet=True)
logger.debug("[{0}] Fetched containers list from docker daemon".format(threading.current_thread().name))
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Timeout while trying to get list of containers from docker: {0}".format(e))
with self._padlock:
self._check_in_progress = False
self.last_periodic_run_ok = False
return
except Exception as e:
logger.error("Unexpected error while trying to get list of containers from docker: {0}".format(e))
with self._padlock:
self._check_in_progress = False
self.last_periodic_run_ok = False
return
ids = [container['Id'] for container in containers]
for container_id in ids:
container = self.check_container(container_id, check_source)
if container is None:
continue
yield container
logger.debug("Containers checked")
if self._config.cache_params:
logger.debug("Purging cache")
self.purge_cache(ids)
self.last_periodic_run_ok = True
self.last_check_containers_run_end_timestamp = datetime.datetime.utcnow()
self.last_check_containers_run_time = self.last_check_containers_run_end_timestamp \
- self.last_check_containers_run_start_timestamp
logger.debug("Periodic check done")
with self._padlock:
self._check_in_progress = False
def get_events_observable(self) -> Iterable[Any]:
successful = False
ev = None
while not successful:
try:
ev = self._client.events(decode=True)
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when subscribing for container events, retrying in 5s: {0}".format(e))
time.sleep(5)
except Exception as e:
logger.error("Unexpected error when subscribing for container events, retrying in 5s: {0}".format(e))
time.sleep(5)
successful = True
return ev
def kill_container(self, container: Container) -> None:
try:
self._client.stop(container.params['Id'])
except (ReadTimeout, ProtocolError) as e:
logger.error("Communication error when stopping container {0}: {1}".format(container.cid, e))
except Exception as e:
logger.error("Unexpected error when stopping container {0}: {1}".format(container.cid, e))
def show_lastload():
try:
r = requests.get(
EDATA_API_URL + '/lastload',
headers=HEADERS,
)
lastload_json = r.json()
except (ConnectionError, ProtocolError) as e:
print("??????? ?'???????: `{}`".format(e.args[0].args[0]))
sys.exit(1)
else:
d = iso8601_to_date(lastload_json['response']['lastload'],
lastload=True)
print(d)
sys.exit(0)
def upload_queue_processor():
"""
Implements a simple re-try mechanism for pending uploads
:return:
"""
while True:
if upload_queue.not_empty:
callable_up = upload_queue.get() # blocks
# TODO: pass in the actual item being updated/uploaded, so we can do more intelligent retry mechanisms
was_list = isinstance(callable_up, list)
last_modified_time = oauth = None
if was_list:
last_modified_time, callable_up, oauth = callable_up
args = callable_up.args if isinstance(callable_up, partial) else None
num_retries = 15
for x in range(15):
try:
ret_val = callable_up()
if was_list:
item = ret_val # is the new/updated item
if isinstance(item, File):
client = Client(oauth)
file_obj = client.file(file_id=item.object_id).get()
redis_set(r_c, file_obj, last_modified_time, box_dir_path=BOX_DIR)
break
except BoxAPIException as e:
crate_logger.debug('{the_args}, {the_trace}'.format(the_args=args,
the_trace=traceback.format_exc()))
if e.status == 409:
crate_logger.debug('Apparently Box says this item already exists...'
'and we were trying to create it. Need to handle this better. message: {}'.format(e.message))
break
except (ConnectionError, BrokenPipeError, ProtocolError, ConnectionResetError):
time.sleep(3)
crate_logger.debug('{the_args}, {the_trace}'.format(the_args=args,
the_trace=traceback.format_exc()))
if x >= num_retries - 1:
crate_logger.debug('Upload giving up on: {}'.format(callable_up))
# no immediate plans to do anything with this info, yet.
uploads_given_up_on.append(callable_up)
except (TypeError, FileNotFoundError):
crate_logger.debug(traceback.format_exc())
break
upload_queue.task_done()
def upload(self, file_path, folder_id=None):
mime_type, _ = guess_type(file_path)
mime_type = 'application/octet-stream' if mime_type is None else mime_type
file_name = os.path.basename(file_path)
self._debug('upload - %s(%s) => %s', file_name, mime_type,
folder_id if folder_id is not None else 'root')
file_stats = os.stat(file_path)
file_name_encoded = unicode(file_name, "utf-8", errors="ignore")
description = dict(name=file_name_encoded, size=str(file_stats.st_size))
if folder_id is not None:
description['folder'] = folder_id
def send():
with open(file_path, 'rb') as f:
uri = '/files/content'
m = MultipartEncoder(
fields=dict(description=json.dumps(dict(description)),
file=(file_name_encoded, f, mime_type))
)
response = self._call(self.client.post, '%s%s%s' % (URL_UPLOAD, BASE_URI, uri),
data=m,
headers={'Content-Type': m.content_type})
self._debug('upload - %s - %s', file_name, response.text)
return AbstractDomain._read_response(response)
try:
return send()
except ClientError, ex:
if Files._is_token_expired_on_upload(ex.response):
self._logger.info('upload - token invalid - refreshing token')
self.client._refresh_token()
send()
else:
raise
except ConnectionError, conn:
if len(conn.args) == 1 and type(conn.args[0]) == ProtocolError :
protocol_error = conn.args[0]
if len(protocol_error.args) == 2:
str_error, error = protocol_error.args
if error.errno == errno.ECONNRESET:
self._logger.info('upload - connection reset - refreshing token')
self.client._refresh_token()
self._logger.info('upload - resending the payload')
return send()
raise
except BaseException, ex:
self._logger.error('upload - unmanaged exception - %s', type(ex))
def request(self, resource, params=None, files=None):
"""Request a Twitter REST API or Streaming API resource.
:param resource: A valid Twitter endpoint (ex. "search/tweets")
:param params: Dictionary with endpoint parameters or None (default)
:param files: Dictionary with multipart-encoded file or None (default)
:returns: TwitterResponse
:raises: TwitterConnectionError
"""
resource, endpoint = self._get_endpoint(resource)
if endpoint not in ENDPOINTS:
raise Exception('Endpoint "%s" unsupported' % endpoint)
session = requests.Session()
session.auth = self.auth
session.headers = {'User-Agent': USER_AGENT}
method, subdomain = ENDPOINTS[endpoint]
url = self._prepare_url(subdomain, resource)
if 'stream' in subdomain:
session.stream = True
timeout = STREAMING_TIMEOUT
# always use 'delimited' for efficient stream parsing
if not params:
params = {}
params['delimited'] = 'length'
params['stall_warning'] = 'true'
else:
session.stream = False
timeout = REST_TIMEOUT
if method == 'POST':
data = params
params = None
else:
data = None
try:
r = session.request(
method,
url,
data=data,
params=params,
timeout=(CONNECTION_TIMEOUT,timeout),
files=files,
proxies=self.proxies)
except (ConnectionError, ProtocolError, ReadTimeout, ReadTimeoutError,
SSLError, ssl.SSLError, socket.error) as e:
raise TwitterConnectionError(e)
return TwitterResponse(r, session.stream)