def json(self, url, params, timeout=None):
with aiohttp.Timeout(timeout or self.timeout):
async with self.session.get(url, params=params) as response:
return await response.json()
python类Timeout()的实例源码
def get_text(self, url, params, timeout=None):
with aiohttp.Timeout(timeout or self.timeout):
response = await self.session.get(url, params=params)
return response.status, await response.text()
def get_bin(self, url, params, timeout=None):
with aiohttp.Timeout(timeout or self.timeout):
response = await self.session.get(url, params=params)
return await response.read()
def post_text(self, url, data, timeout=None):
with aiohttp.Timeout(timeout or self.timeout):
response = await self.session.post(url, data=data)
return response.url, await response.text()
def do_push(push_url, body, address, signing_key, reviewee_id):
path = '/' + push_url.split('/', 3)[-1]
method = 'POST'
backoff = 5
retries = 10
terminate = False
async with aiohttp.ClientSession() as session:
while not terminate:
timestamp = int(time.time())
signature = sign_request(signing_key, method, path, timestamp, body)
with aiohttp.Timeout(10):
async with session.post(push_url,
headers={
'content-type': 'application/json',
TOSHI_SIGNATURE_HEADER: signature,
TOSHI_ID_ADDRESS_HEADER: address,
TOSHI_TIMESTAMP_HEADER: str(timestamp)},
data=body) as response:
if response.status == 204 or response.status == 200:
terminate = True
else:
log.error("Error updating user details")
log.error("URL: {}".format(push_url))
log.error("User Address: {}".format(reviewee_id))
retries -= 1
if retries <= 0:
terminate = True
await asyncio.sleep(backoff)
backoff = min(backoff + 5, 30)
def get_resource(self, api_path, query_params={}, **kwargs):
"""
Helper method for HTTP GET API requests.
Args:
api_path(str): REST API path
Keyword Args:
query_params: Query parameters to add to the API URL
kwargs: Other keyword args used for replacing items in the API path
Returns:
Response JSON data
"""
get_headers = {
'Accept': 'application/json',
**self.headers
}
url = self.build_api_url(
api_path.format(
tenant=self.tenant,
controllerId=self.controller_id,
**kwargs))
self.logger.debug('GET {}'.format(url))
with aiohttp.Timeout(self.timeout):
async with self.session.get(url, headers=get_headers,
params=query_params) as resp:
await self.check_http_status(resp)
json = await resp.json()
self.logger.debug(json)
return json
def get_binary(self, url, dl_location,
mime='application/octet-stream', chunk_size=512,
timeout=3600):
"""
Actual download method with checksum checking.
Args:
url(str): URL of item to download
dl_location(str): storage path for downloaded artifact
Keyword Args:
mime: mimetype of content to retrieve
(default: 'application/octet-stream')
chunk_size: size of chunk to retrieve
timeout: download timeout
(default: 3600)
Returns:
MD5 hash of downloaded content
"""
get_bin_headers = {
'Accept': mime,
**self.headers
}
hash_md5 = hashlib.md5()
self.logger.debug('GET binary {}'.format(url))
with aiohttp.Timeout(timeout, loop=self.session.loop):
async with self.session.get(url, headers=get_bin_headers) as resp:
await self.check_http_status(resp)
with open(dl_location, 'wb') as fd:
while True:
with aiohttp.Timeout(60):
chunk = await resp.content.read(chunk_size)
if not chunk:
break
fd.write(chunk)
hash_md5.update(chunk)
return hash_md5.hexdigest()
def send(self, data, headers, timeout=None):
"""Use synchronous interface, because this is a coroutine."""
try:
with aiohttp.Timeout(timeout):
async with self.client.post(self._url,
data=data,
headers=headers) as response:
assert response.status == 202
except asyncio.TimeoutError as e:
print_trace = True
message = ("Connection to APM Server timed out "
"(url: %s, timeout: %s seconds)" % (self._url, timeout))
raise TransportException(message, data,
print_trace=print_trace) from e
except AssertionError as e:
print_trace = True
body = await response.read()
if response.status == 429:
message = 'Temporarily rate limited: '
print_trace = False
else:
message = 'Unable to reach APM Server: '
message += '%s (url: %s, body: %s)' % (e, self._url, body)
raise TransportException(message, data,
print_trace=print_trace) from e
except Exception as e:
print_trace = True
message = 'Unable to reach APM Server: %s (url: %s)' % (
e, self._url)
raise TransportException(message, data,
print_trace=print_trace) from e
else:
return response.headers.get('Location')
def call(method, file=None, **kwargs):
r"""
Perform an API call to Slack.
:param file: File pointer
:type file: file
:param \**kwargs: see below
:Keyword Arguments:
All the arguments required by the method from the `Slack Web API`_.
:returns: JSON response.
:rtype: dict
"""
# JSON encode any sub-structure...
for k, w in kwargs.items():
# keep str as is.
if not isinstance(w, (bytes, str)):
kwargs[k] = json.dumps(w)
form = FormData(kwargs)
# Handle file upload
if file:
form.add_field('file', file)
logging.debug('POST (m=%s) /api/%s %s', form.is_multipart, method, kwargs)
with ClientSession() as session:
with Timeout(10):
response = yield from session.post('https://{0}/api/{1}'
.format(SLACK_DOMAIN, method),
data=form)
assert 200 == response.status, response
try:
body = yield from response.json()
logging.debug('Response /api/%s %d %s',
method, response.status, body)
return body
finally:
yield from response.release()
def fetch(session, url):
with aiohttp.Timeout(60 * 4):
async with session.get(url) as response:
return await response.text()
def fetch_json(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.json()
def fetch_json(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.json()
def fetchURL(url, loop):
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url) as response:
return await response.text()
def downloadImage(url, folder, name, loop, chunkSize=20):
result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url, headers=headers) as response:
content_type = response.headers['content-type']
if response.status == 200:
result['canAccessURL'] = True
if "image" in content_type:
result['isImage'] = True
if not result['canAccessURL'] or not result['isImage']:
return result
extension = mimetypes.guess_extension(content_type)
if extension == '.jpe':
extension = '.jpg'
with open(folder + "/" + name + extension, 'wb') as fd:
while True:
chunk = await response.content.read(chunkSize)
if not chunk:
break
fd.write(chunk)
result['fileSaved'] = True
return result
def fetchURL(url, loop):
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url) as response:
return await response.text()
def downloadImage(url, folder, name, loop, chunkSize=20):
result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
async with aiohttp.ClientSession(loop=loop) as session:
with aiohttp.Timeout(10, loop=session.loop):
async with session.get(url, headers=headers) as response:
content_type = response.headers['content-type']
if response.status == 200:
result['canAccessURL'] = True
if "image" in content_type:
result['isImage'] = True
if not result['canAccessURL'] or not result['isImage']:
return result
extension = mimetypes.guess_extension(content_type)
if extension == '.jpe':
extension = '.jpg'
with open(folder + "/" + name + extension, 'wb') as fd:
while True:
chunk = await response.content.read(chunkSize)
if not chunk:
break
fd.write(chunk)
result['fileSaved'] = True
return result
def fetch(url, proxy=None):
conn = aiohttp.ProxyConnector(proxy=proxy)
headers = {'user-agent': get_user_agent()}
with aiohttp.ClientSession(connector=conn) as session:
with aiohttp.Timeout(TIMEOUT):
async with session.get('http://python.org', headers) as resp:
return resp.json()
def _download_async(self, url, f_handle):
DOWNLOAD_TIMEOUT = 10
DOWNLOAD_CHUNK_SIZE = 1024
with aiohttp.Timeout(DOWNLOAD_TIMEOUT):
async with self.aiohttp.get(url) as response:
while True:
chunk = await response.content.read(DOWNLOAD_CHUNK_SIZE)
if not chunk:
break
f_handle.write(chunk)
return await response.release()
def _get_feed(self, url):
text = None
try:
with aiohttp.ClientSession() as session:
with aiohttp.Timeout(3):
async with session.get(url) as r:
text = await r.text()
except:
pass
return text