def retry(conf, action):
n_retries = int(conf.get('N_RETRIES', '5'))
reset_period = int(conf.get('RESET_PERIOD', '3600'))
error_pause = int(conf.get('ERROR_PAUSE', '30'))
reset = int(time.time())
i = 0
while True:
try:
ret = action()
# These are the exception types that justify a retry -- extend this list as needed
except (httplib.IncompleteRead, socket.error, boto.exception.BotoClientError, ValueErrorRetry), e:
now = int(time.time())
if now > reset + reset_period:
print "******* RETRY RESET"
i = 0
reset = now
i += 1
print "******* RETRY %d/%d: %s" % (i, n_retries, e)
if i < n_retries:
print "******* WAITING %d seconds..." % (error_pause,)
time.sleep(error_pause)
else:
raise ValueError("FAIL after %d retries" % (n_retries,))
else:
return ret
python类IncompleteRead()的实例源码
def retry(conf, action):
n_retries = int(conf.get('N_RETRIES', '5'))
reset_period = int(conf.get('RESET_PERIOD', '3600'))
error_pause = int(conf.get('ERROR_PAUSE', '30'))
reset = int(time.time())
i = 0
while True:
try:
ret = action()
# These are the exception types that justify a retry -- extend this list as needed
except (httplib.IncompleteRead, socket.error, boto.exception.BotoClientError, ValueErrorRetry), e:
now = int(time.time())
if now > reset + reset_period:
print "******* RETRY RESET"
i = 0
reset = now
i += 1
print "******* RETRY %d/%d: %s" % (i, n_retries, e)
if i < n_retries:
print "******* WAITING %d seconds..." % (error_pause,)
time.sleep(error_pause)
else:
raise ValueError("FAIL after %d retries" % (n_retries,))
else:
return ret
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def download_disk_image(connection, module):
def _transfer(transfer_service, proxy_connection, proxy_url, transfer_ticket):
disks_service = connection.system_service().disks_service()
disk = disks_service.disk_service(module.params['id']).get()
size = disk.actual_size
transfer_headers = {
'Authorization': transfer_ticket,
}
with open(module.params['download_image_path'], "wb") as mydisk:
pos = 0
MiB_per_request = 8
chunk_size = 1024 * 1024 * MiB_per_request
while pos < size:
transfer_service.extend()
transfer_headers['Range'] = 'bytes=%d-%d' % (pos, min(size, pos + chunk_size) - 1)
proxy_connection.request(
'GET',
proxy_url.path,
headers=transfer_headers,
)
r = proxy_connection.getresponse()
if r.status >= 300:
raise Exception("Error: %s" % r.read())
try:
mydisk.write(r.read())
except IncompleteRead as e:
mydisk.write(e.partial)
break
pos += chunk_size
return transfer(
connection,
module,
otypes.ImageTransferDirection.DOWNLOAD,
transfer_func=_transfer,
)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def __init__(self, e, uri, format, uriparts):
self.e = e
self.uri = uri
self.format = format
self.uriparts = uriparts
try:
data = self.e.fp.read()
except http_client.IncompleteRead as e:
# can't read the error text
# let's try some of it
data = e.partial
if self.e.headers.get('Content-Encoding') == 'gzip':
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
self.response_data = f.read()
else:
self.response_data = data
super(TwitterHTTPError, self).__init__(str(self))
def _handle_response(self, req, uri, arg_data, _timeout=None):
kwargs = {}
if _timeout:
kwargs['timeout'] = _timeout
try:
handle = urllib_request.urlopen(req, **kwargs)
if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
return handle
try:
data = handle.read()
except http_client.IncompleteRead as e:
# Even if we don't get all the bytes we should have there
# may be a complete response in e.partial
data = e.partial
if handle.info().get('Content-Encoding') == 'gzip':
# Handle gzip decompression
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
data = f.read()
if "json" == self.format:
res = json.loads(data.decode('utf8'))
return wrap_response(res, handle.headers)
else:
return wrap_response(
data.decode('utf8'), handle.headers)
except urllib_error.HTTPError as e:
if (e.code == 304):
return []
else:
raise TwitterHTTPError(e, uri, self.format, arg_data)
def _handle_response(self, req, uri, arg_data, _timeout=None):
kwargs = {}
if _timeout:
kwargs['timeout'] = _timeout
try:
handle = urllib_request.urlopen(req, **kwargs)
if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
return handle
try:
data = handle.read()
except http_client.IncompleteRead as e:
# Even if we don't get all the bytes we should have there
# may be a complete response in e.partial
data = e.partial
if handle.info().get('Content-Encoding') == 'gzip':
# Handle gzip decompression
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
data = f.read()
if "json" == self.format:
res = json.loads(data.decode('utf8'))
return wrap_response(res, handle.headers)
else:
return wrap_response(
data.decode('utf8'), handle.headers)
except urllib_error.HTTPError as e:
if (e.code == 304):
return []
else:
raise TwitterHTTPError(e, uri, self.format, arg_data)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
response.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
response.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def _update_chunk_length(self):
# First, we'll figure out length of a chunk and then
# we'll try to read it from socket.
if self.chunk_left is not None:
return
line = self._fp.fp.readline()
line = line.split(b';', 1)[0]
try:
self.chunk_left = int(line, 16)
except ValueError:
# Invalid chunked protocol response, abort.
self.close()
raise httplib.IncompleteRead(line)
def __init__(self, queue, database,
username=None, password=None,
dbadmin_username=None, dbadmin_password=None,
line_format='single-line', tags=None,
unit_system=None, augment_record=True,
inputs=dict(), obs_to_upload='all', append_units_label=True,
server_url=_DEFAULT_SERVER_URL, skip_upload=False,
manager_dict=None,
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5):
super(InfluxThread, self).__init__(queue,
protocol_name='Influx',
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
max_tries=max_tries,
timeout=timeout,
retry_wait=retry_wait)
self.database = database
self.username = username
self.password = password
self.tags = tags
self.upload_all = True if obs_to_upload.lower() == 'all' else False
self.append_units_label = append_units_label
self.inputs = inputs
self.server_url = server_url
self.skip_upload = to_bool(skip_upload)
self.unit_system = unit_system
self.augment_record = augment_record
self.templates = dict()
self.line_format = line_format
# ensure that the database exists
qstr = urllib.urlencode({'q': 'CREATE DATABASE %s' % self.database})
url = '%s/query?%s' % (self.server_url, qstr)
req = urllib2.Request(url)
req.add_header("User-Agent", "weewx/%s" % weewx.__version__)
uname = None
pword = None
if dbadmin_username is not None:
uname = dbadmin_username
pword = dbadmin_password
elif username is not None:
uname = username
pword = password
if uname is not None:
b64s = base64.encodestring(
'%s:%s' % (uname, pword)).replace('\n', '')
req.add_header("Authorization", "Basic %s" % b64s)
try:
self.post_request(req)
except (urllib2.URLError, socket.error, httplib.BadStatusLine, httplib.IncompleteRead), e:
logerr("create database failed: %s" % e)
def osu_get(conn, endpoint, paramsdict=None):
'''GETs /api/endpoint?paramsdict&k=args.key from conn.
return json object, exits process on api errors'''
global osu_treset, osu_ncalls, args
sys.stderr.write("%s %s\n" % (endpoint, str(paramsdict)))
paramsdict["k"] = args.key
path = "/api/%s?%s" % (endpoint, urllib.urlencode(paramsdict))
while True:
while True:
if time.time() >= osu_treset:
osu_ncalls = 0
osu_treset = time.time() + 60
sys.stderr.write("\napi ready\n")
if osu_ncalls < 60:
break
else:
sys.stderr.write("waiting for api cooldown...\r")
time.sleep(1)
try:
conn.request("GET", path)
osu_ncalls += 1
r = conn.getresponse()
raw = ""
while True:
try:
raw += r.read()
break
except httplib.IncompleteRead as e:
raw += e.partial
j = json.loads(raw)
if "error" in j:
sys.stderr.write("%s\n" % j["error"])
sys.exit(1)
return j
except (httplib.HTTPException, ValueError) as e:
sys.stderr.write("%s\n" % (traceback.format_exc()))
try:
'''
prevents exceptions on next request if the
response wasn't previously read due to errors
'''
conn.getresponse().read()
except httplib.HTTPException:
pass
time.sleep(5)
def test_http_response_early_failure(self):
header = ('the runtime process gave a bad HTTP response: '
'IncompleteRead(0 bytes read)\n\n')
def dave_message():
return "I'm sorry, Dave. I'm afraid I can't do that.\n"
self.proxy = http_proxy.HttpProxy(
host='localhost', port=23456,
instance_died_unexpectedly=lambda: False,
instance_logs_getter=dave_message,
error_handler_file=None)
login.get_user_info(None).AndReturn(('', False, ''))
httplib.HTTPConnection.connect()
httplib.HTTPConnection.request(
'GET', '/get%20request?key=value', None,
{'HEADER': 'value',
http_runtime_constants.REQUEST_ID_HEADER: 'request id',
'X-AppEngine-Country': 'ZZ',
'X-Appengine-User-Email': '',
'X-Appengine-User-Id': '',
'X-Appengine-User-Is-Admin': '0',
'X-Appengine-User-Nickname': '',
'X-Appengine-User-Organization': '',
'X-APPENGINE-DEV-SCRIPT': 'get.py',
'X-APPENGINE-SERVER-NAME': 'localhost',
'X-APPENGINE-SERVER-PORT': '8080',
'X-APPENGINE-SERVER-PROTOCOL': 'HTTP/1.1',
})
httplib.HTTPConnection.getresponse().AndRaise(httplib.IncompleteRead(''))
httplib.HTTPConnection.close()
environ = {'HTTP_HEADER': 'value', 'PATH_INFO': '/get request',
'QUERY_STRING': 'key=value',
'HTTP_X_APPENGINE_USER_ID': '123',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.1',
}
self.mox.ReplayAll()
expected_headers = {
'Content-Type': 'text/plain',
'Content-Length': '%d' % (len(header) + len(dave_message()))
}
self.assertResponse('500 Internal Server Error', expected_headers,
header + dave_message(),
self.proxy.handle, environ,
url_map=self.url_map,
match=re.match(self.url_map.url, '/get%20request'),
request_id='request id',
request_type=instance.NORMAL_REQUEST)
self.mox.VerifyAll()