def __init__(self, endpoint, api_path, event_handler, principal=None, secret=None):
self.endpoint = endpoint
self.api_path = api_path
self.subscription_client = AsyncHTTPClient()
self.outbound_client = AsyncHTTPClient()
self.event_handler = event_handler
self.pending = {}
self.buffer = deque()
self.mesos_stream_id = None
self.closing = False
self.connection_successful = False
self._headers = HTTPHeaders()
if principal is not None and secret is not None:
self.headers['Authorization'] = self._basic_credential = 'Basic %s' % (
b2a_base64(
('%s:%s' % (principal, secret)).encode('ascii')
).decode('ascii').strip()
)
python类AsyncHTTPClient()的实例源码
def call(self, request):
if self.client is None:
self.client = AsyncHTTPClient()
answer = gen.Future()
http_request = _to_http_request(self.url, request)
@fail_to(answer)
def on_fetch(future):
if future.exception():
_handle_request_exception(future.exception())
result = future.result()
answer.set_result(
Response(
headers=headers.from_http_headers(result.headers),
body=result.body,
),
)
self.client.fetch(http_request).add_done_callback(on_fetch)
return answer
def test_bad_request_error(req, msg):
inbound = HTTPInbound()
inbound.start(None)
client = AsyncHTTPClient()
req.url = 'http://localhost:%s' % inbound.port
req.method = 'POST'
req.body = ''
with pytest.raises(HTTPError) as e:
yield client.fetch(req)
e = e.value
assert e.code >= 400 and e.code <= 500
assert e.response.body == msg
def test_inbound_headers(http_headers, rpc_headers, http_resp_headers):
class Handler(object):
def handle(self, request):
assert rpc_headers == request.headers
return Response(headers=rpc_headers, body=request.body)
inbound = HTTPInbound()
inbound.start(Handler())
client = AsyncHTTPClient()
req = HTTPRequest(
url='http://localhost:%s' % inbound.port,
method='POST',
headers=http_headers,
body='',
)
res = yield client.fetch(req)
assert 200 == res.code
for k, v in http_resp_headers.items():
assert v == res.headers[k]
def access_token(self, code, state):
client = AsyncHTTPClient()
payload = (
('client_id', self.client_id),
('client_secret', self.client_secret),
('grant_type', 'authorization_code'),
('redirect_uri', Twitch.REDIRECT_URI),
('code', code),
('state', state),
)
url = Twitch.TOKEN_URL
request = HTTPRequest(
url = url,
method = 'POST',
body = urlencode(payload)
)
tornado_future = client.fetch(request)
future = to_asyncio_future(tornado_future)
response = await future
data = json_decode(response.body)
return data['access_token']
def get_links_from_url(url):
"""Download the page at `url` and parse it for links.
Returned links have had the fragment after `#` removed, and have been made
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
'http://www.tornadoweb.org/en/stable/gen.html'.
"""
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
html = response.body if isinstance(response.body, str) \
else response.body.decode()
urls = [urljoin(url, remove_fragment(new_url))
for new_url in get_links(html)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def get_user_list():
'''
????????? ???openid, ??
'''
access_token = yield find_access_token()
if access_token is None:
raise Return(False)
client = AsyncHTTPClient()
resp = yield client.fetch(tornado_options.get_user_list_url.format(access_token, ''))
openids = json.loads(resp.body).get('data').get('openid')
result = dict()
for openid in openids:
ret = yield get_user_detail(openid)
tmp = dict(headimgurl = ret.get('headimgurl'), openid = ret.get('openid'), nickname = ret.get('nickname'))
result[openid] = tmp
raise Return(result)
def is_running(self):
'''Check if our proxied process is still running.'''
if 'proc' not in self.state:
return False
# Check if the process is still around
proc = self.state['proc']
if proc.proc.poll() == 0:
self.log.info('Cannot poll on process.')
return False
client = httpclient.AsyncHTTPClient()
req = httpclient.HTTPRequest('http://localhost:{}'.format(self.port))
try:
yield client.fetch(req)
self.log.debug('Got positive response from rstudio server')
except:
self.log.debug('Got negative response from rstudio server')
return False
return True
def post_message(msg, endpoint, channel, username='BBTornado', unfurl_links=False, icon=":robot_face:"):
"""
Post a message on slack.
This will "fire-and-forget", so returns nothing.
"""
client = AsyncHTTPClient()
body = dict(icon_emoji=icon,
text=msg,
username=username,
unfurl_links=unfurl_links,
channel=channel)
req = HTTPRequest(endpoint, method='POST', headers={ 'Content-Type': 'application/json' }, body=json.dumps(body))
IOLoop.current().spawn_callback(client.fetch, req, raise_error=False)
def sendRedPackage(self, openid, amount):
await self.db.add_log(openid,"try send RedPackage", "retry_Time :" + "none")
url = "https://api.mch.weixin.qq.com/mmpaymkttransfers/sendredpack"
strr = generateWXParam(openid, amount)
request = HTTPRequest(url = url, method = "POST", body = strr, client_key="/home/coco/cert/apiclient_key.pem",
ca_certs="/home/coco/cert/rootca.pem", client_cert="/home/coco/cert/apiclient_cert.pem")
client = AsyncHTTPClient()
try:
response = await client.fetch(request)
res = parseWeixin(response.body.decode('utf-8'))
await self.db.add_log(openid,"send RedPackage response", res)
if res['return_code'] == 'SUCCESS' and res['result_code']=='SUCCESS' :
config.hasSent[config.turn-1][openid] = amount/100.0
await self.db.add_order(openid,config.turn,amount/100.0,"Sent")
else :
config.sendPackageResponseError += 1
await self.db.add_order(openid,config.turn,amount/100.0,"NotSent")
except Exception as e:
await self.db.add_log(openid,"send RedPackage response", "redpackage Callback failed")
config.sendPackageError += 1
def async_fetch(url, headers=None, method="GET", data=None, follow_redirects=False):
"""
Async http fetch
:param url:
:param headers:
:param method:
:param data:
:param follow_redirects:
"""
client = HTTPClient()
headers = headers or {}
body = None
if method == "GET" and data is not None:
url = url + '?' + urlencode(data)
elif method == "POST" and data is not None:
headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
body = urlencode(data)
request = HTTPRequest(url=url, headers=headers,
method=method, follow_redirects=follow_redirects,
body=body)
response = yield client.fetch(request, raise_error=False)
# return response
raise gen.Return(response)
def get_authenticated_user(self, callback):
"""Fetches the authenticated user data upon redirect."""
# Look to see if we are doing combined OpenID/OAuth
oauth_ns = ""
for name, values in self.request.arguments.iteritems():
if name.startswith("openid.ns.") and \
values[-1] == u"http://specs.openid.net/extensions/oauth/1.0":
oauth_ns = name[10:]
break
token = self.get_argument("openid." + oauth_ns + ".request_token", "")
if token:
http = httpclient.AsyncHTTPClient()
token = dict(key=token, secret="")
http.fetch(self._oauth_access_token_url(token),
self.async_callback(self._on_access_token, callback))
else:
OpenIdMixin.get_authenticated_user(self, callback)
def get_links_from_url(url):
"""Download the page at `url` and parse it for links.
Returned links have had the fragment after `#` removed, and have been made
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
'http://www.tornadoweb.org/en/stable/gen.html'.
"""
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
urls = [urlparse.urljoin(url, remove_fragment(new_url))
for new_url in get_links(response.body)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def get_authenticated_user(self, callback):
"""Fetches the authenticated user data upon redirect."""
# Look to see if we are doing combined OpenID/OAuth
oauth_ns = ""
for name, values in self.request.arguments.iteritems():
if name.startswith("openid.ns.") and \
values[-1] == u"http://specs.openid.net/extensions/oauth/1.0":
oauth_ns = name[10:]
break
token = self.get_argument("openid." + oauth_ns + ".request_token", "")
if token:
http = httpclient.AsyncHTTPClient()
token = dict(key=token, secret="")
http.fetch(self._oauth_access_token_url(token),
self.async_callback(self._on_access_token, callback))
else:
OpenIdMixin.get_authenticated_user(self, callback)
def poll(self):
"""Poll for and return the raw status data provided by the Presto REST API.
:returns: dict -- JSON status information or ``None`` if the query is done
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
raise Return(None)
request = HTTPRequest(self._nextUri)
client = AsyncHTTPClient(max_clients=512)
response = yield client.fetch(request)
self._process_response(response)
raise Return(json_decode(response.body))
def test_poll(self, cursor):
@gen.engine
def f():
yield cursor.poll()
self.stop()
self.assertRaises(presto.ProgrammingError, self.run_gen, f)
yield cursor.execute('SELECT * FROM one_row')
while True:
status = yield cursor.poll()
if status is None:
break
self.assertIn('stats', status)
def fail(*args, **kwargs):
self.fail("Should not need requests.get after done polling") # pragma: no cover
with mock.patch.object(AsyncHTTPClient, 'fetch') as fetch:
fetch.side_effect = fail
self.assertEqual((yield cursor.fetchall()), [[1]])
def __init__(self, events_url=None, people_url=None, import_url=None, request_timeout=None, ioloop=None):
self._endpoints = {
'events': events_url or 'https://api.mixpanel.com/track',
'people': people_url or 'https://api.mixpanel.com/engage',
'imports': import_url or 'https://api.mixpanel.com/import',
}
self._queues = {}
self._request_timeout = request_timeout
if ioloop is None:
ioloop = IOLoop.current()
self.ioloop = ioloop
self._api_key = None
self._httpclient = AsyncHTTPClient()
self._tasks = []
for endpoint in self._endpoints:
self._queues[endpoint] = asyncio.Queue()
self._tasks.append(asyncio.ensure_future(self.flush(endpoint)))
def get_links_from_url(url):
"""Download the page at `url` and parse it for links.
Returned links have had the fragment after `#` removed, and have been made
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
'http://www.tornadoweb.org/en/stable/gen.html'.
"""
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
html = response.body if isinstance(response.body, str) \
else response.body.decode()
urls = [urljoin(url, remove_fragment(new_url))
for new_url in get_links(html)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def get_posts_url_from_page(page_url):
"""
????????????URL
:param page_url
:return:
"""
try:
response = yield httpclient.AsyncHTTPClient().fetch(page_url, headers=headers)
soup = BeautifulSoup(response.body, 'html.parser')
posts_tag = soup.find_all('div', class_="post floated-thumb")
urls = []
for index, archive in enumerate(posts_tag):
meta = archive.find("div", class_="post-meta")
url = meta.p.a['href']
urls.append(url)
raise gen.Return(urls)
except httpclient.HTTPError as e:
print('Exception: %s %s' % (e, page_url))
raise gen.Return([])
def get_httpclient(self):
return AsyncHTTPClient()
def setUp(self):
self.ioloop = ioloop.IOLoop.instance()
self.client = Client(host=['127.0.0.1:2370', '127.0.0.1:2371', '127.0.0.1:2372'],
httpclient=httpclient.AsyncHTTPClient(),
ioloop=self.ioloop)
def send_email(to, subject, html):
if isinstance(to, unicode):
to = to.encode('utf-8')
if isinstance(subject, unicode):
subject = subject.encode('utf-8')
if isinstance(html, unicode):
html = html.encode('utf-8')
data = {
'from': CONFIG.EMAIL_SENDER,
'to': to,
'subject': subject,
'html': html
}
data = urlencode(data)
request = HTTPRequest(
url=_MAILGUN_API_URL,
method='POST',
auth_username='api',
auth_password=CONFIG.MAILGUN_API_KEY,
body=data
)
client = AsyncHTTPClient()
try:
yield client.fetch(request)
except HTTPError as e:
try:
response = e.response.body
except AttributeError:
response = None
logging.exception('failed to send email:\nto: %s\nsubject: %s\nhtml: %s\nresponse: %s', to, subject, html, response)
def get_http_client(self):
return AsyncHTTPClient(io_loop=self.io_loop)
def fetch(self, path, **kwargs):
"""Convenience method to synchronously fetch a url.
The given path will be appended to the local server's host and
port. Any additional kwargs will be passed directly to
`.AsyncHTTPClient.fetch` (and so could be used to pass
``method="POST"``, ``body="..."``, etc).
"""
self.http_client.fetch(self.get_url(path), self.stop, **kwargs)
return self.wait()
def get_http_client(self):
return AsyncHTTPClient(io_loop=self.io_loop, force_instance=True,
defaults=dict(validate_cert=False))
def get(self):
io_loop = self.request.connection.stream.io_loop
client = AsyncHTTPClient(io_loop=io_loop)
response = yield gen.Task(client.fetch, self.get_argument('url'))
response.rethrow()
self.finish(b"got response: " + response.body)
def tornado_fetch(self, url, runner):
responses = []
client = AsyncHTTPClient(self.io_loop)
def callback(response):
responses.append(response)
self.stop_loop()
client.fetch(url, callback=callback)
runner()
self.assertEqual(len(responses), 1)
responses[0].rethrow()
return responses[0]
def get_auth_http_client(self):
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
May be overridden by subclasses to use an HTTP client other than
the default.
"""
return httpclient.AsyncHTTPClient()
def get_auth_http_client(self):
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
May be overridden by subclasses to use an HTTP client other than
the default.
.. versionadded:: 4.3
"""
return httpclient.AsyncHTTPClient()
def get_http_client(self):
return AsyncHTTPClient(io_loop=self.io_loop)