def set_cluster(self, config_file, key=None):
"""
Load a config file to the registry and make it a cluster
:param config_file: Path to the config file to be loaded
:param key: Key that identifies the cluster
:return: Key that identifies the cluster. It can be generated if it is
not given as a parameter. Use only while debugging.
"""
with open(config_file) as f:
cluster = f.read()
arguments = {
'method': 'new_cluster',
'description': cluster
}
if key:
arguments['key'] = key
client = HTTPClient()
response = client.fetch('{}/cluster?{}'.format(
self.uri, parse.urlencode(arguments)),
headers={'Key': self.uk}
)
if response.code == 200:
return response.body.decode('utf-8')
else:
raise ValueError(response.body.decode('utf-8'))
python类HTTPClient()的实例源码
def send(self, text):
"""
Send a log line to the registry
:param text: Text of the log line
"""
arguments = {
'cluster': self.cluster,
}
client = HTTPClient()
client.fetch('{}/logs?{}'.format(
self.uri, parse.urlencode(arguments)),
method='POST',
body=text
)
def new_admin_account(uri, master_key, admin_name, key=None):
"""
Creates a new client account with the master key. The master key is part
of the static configuration of the registry service.
:param uri: URI of the registry service
:param master_key: Master key for the registry service
:param admin_name: Name of the admin account
:param key: Key of the admin account
:return: Key assigned to the admin account. It may be generated automatically
"""
arguments = {
'method': 'new_admin',
'name': admin_name,
}
if key:
arguments['key'] = key
client = HTTPClient()
response = client.fetch('{}/admin?{}'.format(
uri, parse.urlencode(arguments)),
headers={'Key': master_key}
)
if response.code == 200:
return response.body.decode('utf-8')
else:
raise ValueError(response.body.decode('utf-8'))
def synchronous_fetch(url):
http_client = HTTPClient()
response =http_client.fetch(url) # ???HTTPClient?????future
return response.body.decode()
# ?????????????
def register_volume(self, volume):
# globally register volume
global volumes
volumes[volume.token] = volume
# globally register kernel client for this volume in the Jupyter server
cf = url_escape(find_connection_file())
http_client= HTTPClient()
try:
response = http_client.fetch(self.get_server_url() + '/register_token/' + volume.token.decode('utf8') + '/' + cf)
except Exception as e:
raise RuntimeError("could not register token: " + str(e))
http_client.close()
httpclient_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url('/notfound'))
self.assertEqual(assertion.exception.code, 404)
def __request(input0):
client = httpclient.HTTPClient()
output = None
try:
print(colored('tokenising.. ','yellow') + input0)
req = httpclient.HTTPRequest(tokeniser_serv,method='POST',body=input0)
resp = client.fetch(req)
output = resp.body
# Decode bytes
output = output.decode('utf-8')
# Parse the JSON response to a hash object
output = json.loads(output)
except httpclient.HTTPError as e:
# HTTP error header
print(colored('HTTP Error : ' + str(e),'red'))
except Exception as e:
# Some unhandled error
print(colored('ERROR : ' + str(e), 'red'))
client.close()
print(colored('{Tokenizer received:}','white'))
print(colored(output,'white'))
return output
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url('/notfound'))
self.assertEqual(assertion.exception.code, 404)
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url('/notfound'))
self.assertEqual(assertion.exception.code, 404)
def getUpdateInfo():
url = client.protocol + client.updateHostPort + "/netupdate_ajax?type=meta"
logging.debug("getUpdateInfo: Start to get update info. url=%s", url)
cur_client = HTTPClient()
response = cur_client.fetch(url, request_timeout=10)
if response.error:
logging.warn("getUpdateInfo: Failed to get update info. error=%s", response.error)
return None
logging.debug("getUpdateInfo: Current update info. reponse.body=%r", response.body)
res = json_decode(response.body)
return res
def initUpdateInfo():
url = client.protocol + client.updateHostPort + "/pre_update"
logging.debug("initUpdateInfo: Start to init update info. url=%s", url)
cur_client = HTTPClient()
response = cur_client.fetch(url, request_timeout=10)
if response.error:
logging.warn("initUpdateInfo: Failed to init update info. error=%s", response.error)
return None
logging.debug("initUpdateInfo: reponse.body=%r", response.body)
res = json_decode(response.body)
return res
def get(self):
is_on_line = machine_is_online()
cur_client = HTTPClient()
response = cur_client.fetch("http://127.0.0.1:5000/status", request_timeout=10)
if response.error:
logger.warn("Failed to get current box info. error=%s", response.error)
is_on_line = False
res = json_decode(response.body)
if res["code"] != 0:
logger.warn("Failed to get current box info. ret_value=%d", res["ret_value"])
is_on_line = False
if is_on_line:
boxid = res["data"]["boxid"]
params=urllib.urlencode({
"token": "box_setting",
"boxid": boxid,
"progress": 2
})
headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Connection": "Keep-Alive"}
conn = httplib.HTTPConnection("yun.mohou.com")
conn.request(method="POST", url="/api/box/init-setting", body=params, headers=headers)
response = conn.getresponse()
response_json = response.read()
conn.close()
logger.info("Box setting result: " + str(response_json))
is_access_cloud = True
else:
is_access_cloud = False
return self.write({'code': 0, 'msg': 'Success', 'data': {'is_access_cloud': is_access_cloud}})
def get_serial_number():
url = "http://127.0.0.1:5000/profile2"
cur_client = HTTPClient()
response = cur_client.fetch(url, request_timeout=10)
if response.error:
return None
res = json_decode(response.body)
if res["code"] != 0:
return None
return res["data"]['boxid']
def get_data(self):
content = yield httpclient.AsyncHTTPClient().fetch(self.url)
# content = httpclient.HTTPClient().fetch(self.url)
if not content.error:
root = html.fromstring(content.body.decode('utf-8'))
else:
raise gen.Return(None)
bangumi_info = []
for i in range(1, 8):
weekday = i
if weekday == 7:
weekday = 0
for e in root.xpath('//*[@id="tab_100895_%d"]//*[@class="v-meta-title"]/a' % i):
title, hour, minute = self.TIME_PATTERN.match(e.text).groups()
update_time = datetime.time(int(hour), int(minute))
info = {'weekday': weekday,
'url': e.attrib['href'],
'title': title,
'update_time': update_time}
bangumi_info.append(info)
raise gen.Return(bangumi_info)
def get_data(self):
config = {
'proxy_host': '124.88.67.32',
'proxy_port': 843
}
content = yield httpclient.AsyncHTTPClient().fetch(self.url, **config)
# content = httpclient.HTTPClient().fetch(self.url)
if not content.error:
root = html.fromstring(content.body.decode('utf-8'))
else:
# return None
raise gen.Return(None)
bangumi_info = []
# weekday = -1
for e in root.xpath('//*[@id="scrollContent-day_update"]//*[@class="week-updateList_each"]'):
weekday = self.WEEKDAY[e.xpath('./div/span')[0].text]
for record in e.xpath('.//li'):
url = record.xpath('./a')[0].attrib['href']
title = record.xpath('./a/div/div[@class="week-cont_title"]')[0].text
info = {'weekday': weekday,
'url': url,
'title': title,
'update_time': None}
bangumi_info.append(info)
raise gen.Return(bangumi_info)
def _clear_headers_for_304(self):
# Tornado strips content-length from 304 responses, but here we
# want to simulate servers that include the headers anyway.
pass
# These tests end up getting run redundantly: once here with the default
# HTTPClient implementation, and then again in each implementation's own
# test suite.
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url('/notfound'))
self.assertEqual(assertion.exception.code, 404)
def synchronous_fetch(url, callback):
http_client = HTTPClient()
def handle_response(response):
callback(response)
http_client.fetch(url, callback=handle_response)
# ????
def _load_remote_data(self, url):
client = httpclient.HTTPClient()
kwargs = {
'method': 'GET',
'request_timeout': self.load_timeout
}
http_req = httpclient.HTTPRequest(url, **kwargs)
response = client.fetch(http_req)
return response.body
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url('/notfound'))
self.assertEqual(assertion.exception.code, 404)