python类HTTPClient()的实例源码

registry.py 文件源码 项目:pylm-registry 作者: nfqsolutions 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_cluster(self, config_file, key=None):
        """
        Load a config file to the registry and make it a cluster

        :param config_file: Path to the config file to be loaded
        :param key: Key that identifies the cluster
        :return: Key that identifies the cluster. It can be generated if it is
            not given as a parameter. Use only while debugging.
        """
        with open(config_file) as f:
            cluster = f.read()

        arguments = {
            'method': 'new_cluster',
            'description': cluster
        }

        if key:
            arguments['key'] = key

        client = HTTPClient()
        response = client.fetch('{}/cluster?{}'.format(
            self.uri, parse.urlencode(arguments)),
            headers={'Key': self.uk}
        )

        if response.code == 200:
            return response.body.decode('utf-8')

        else:
            raise ValueError(response.body.decode('utf-8'))
logs.py 文件源码 项目:pylm-registry 作者: nfqsolutions 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def send(self, text):
        """
        Send a log line to the registry

        :param text: Text of the log line
        """
        arguments = {
            'cluster': self.cluster,
        }
        client = HTTPClient()
        client.fetch('{}/logs?{}'.format(
            self.uri, parse.urlencode(arguments)),
            method='POST',
            body=text
        )
__init__.py 文件源码 项目:pylm-registry 作者: nfqsolutions 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def new_admin_account(uri, master_key, admin_name, key=None):
    """
    Creates a new client account with the master key. The master key is part
    of the static configuration of the registry service.

    :param uri: URI of the registry service
    :param master_key: Master key for the registry service
    :param admin_name: Name of the admin account
    :param key: Key of the admin account
    :return: Key assigned to the admin account. It may be generated automatically
    """
    arguments = {
        'method': 'new_admin',
        'name': admin_name,
    }
    if key:
        arguments['key'] = key

    client = HTTPClient()
    response = client.fetch('{}/admin?{}'.format(
        uri, parse.urlencode(arguments)),
        headers={'Key': master_key}
    )
    if response.code == 200:
        return response.body.decode('utf-8')
    else:
        raise ValueError(response.body.decode('utf-8'))
??????IO.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def synchronous_fetch(url):
    http_client = HTTPClient()
    response =http_client.fetch(url)    # ???HTTPClient?????future
    return response.body.decode()

# ?????????????
viewer.py 文件源码 项目:nyroglancer 作者: funkey 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def register_volume(self, volume):

        # globally register volume
        global volumes
        volumes[volume.token] = volume

        # globally register kernel client for this volume in the Jupyter server
        cf = url_escape(find_connection_file())
        http_client= HTTPClient()
        try:
            response = http_client.fetch(self.get_server_url() + '/register_token/' + volume.token.decode('utf8') + '/' + cf)
        except Exception as e:
            raise RuntimeError("could not register token: " + str(e))
        http_client.close()
httpclient_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url('/notfound'))
        self.assertEqual(assertion.exception.code, 404)
tokenizer.py 文件源码 项目:pantip-libr 作者: starcolon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __request(input0):
  client = httpclient.HTTPClient()
  output = None
  try:
    print(colored('tokenising.. ','yellow') + input0)
    req    = httpclient.HTTPRequest(tokeniser_serv,method='POST',body=input0)
    resp   = client.fetch(req)
    output = resp.body

    # Decode bytes
    output = output.decode('utf-8')
    # Parse the JSON response to a hash object
    output = json.loads(output)

  except httpclient.HTTPError as e:
    # HTTP error header
    print(colored('HTTP Error : ' + str(e),'red'))
  except Exception as e:
    # Some unhandled error
    print(colored('ERROR : ' + str(e), 'red'))

  client.close()

  print(colored('{Tokenizer received:}','white'))
  print(colored(output,'white'))

  return output
httpclient_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url('/notfound'))
        self.assertEqual(assertion.exception.code, 404)
httpclient_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url('/notfound'))
        self.assertEqual(assertion.exception.code, 404)
boxAgent.py 文件源码 项目:Mohou_Box-master 作者: mohou 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getUpdateInfo():
    url = client.protocol + client.updateHostPort + "/netupdate_ajax?type=meta"
    logging.debug("getUpdateInfo: Start to get update info. url=%s", url)
    cur_client = HTTPClient()
    response = cur_client.fetch(url, request_timeout=10)
    if response.error:
        logging.warn("getUpdateInfo: Failed to get update info. error=%s", response.error)
        return None

    logging.debug("getUpdateInfo: Current update info. reponse.body=%r", response.body)
    res = json_decode(response.body)
    return res
boxAgent.py 文件源码 项目:Mohou_Box-master 作者: mohou 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def initUpdateInfo():
    url = client.protocol + client.updateHostPort + "/pre_update"
    logging.debug("initUpdateInfo: Start to init update info. url=%s", url)
    cur_client = HTTPClient()
    response = cur_client.fetch(url, request_timeout=10)
    if response.error:
        logging.warn("initUpdateInfo: Failed to init update info. error=%s", response.error)
        return None

    logging.debug("initUpdateInfo: reponse.body=%r", response.body)
    res = json_decode(response.body)
    return res
boxUpdate.py 文件源码 项目:Mohou_Box-master 作者: mohou 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self):
        is_on_line = machine_is_online()

        cur_client = HTTPClient()
        response = cur_client.fetch("http://127.0.0.1:5000/status", request_timeout=10)
        if response.error:
            logger.warn("Failed to get current box info. error=%s", response.error)
            is_on_line = False
        res = json_decode(response.body)
        if res["code"] != 0:
            logger.warn("Failed to get current box info. ret_value=%d", res["ret_value"])
            is_on_line = False

        if is_on_line:
            boxid = res["data"]["boxid"]
            params=urllib.urlencode({
                        "token": "box_setting",
                        "boxid": boxid,
                        "progress": 2
                        })
            headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Connection": "Keep-Alive"}
            conn = httplib.HTTPConnection("yun.mohou.com")
            conn.request(method="POST", url="/api/box/init-setting", body=params, headers=headers)
            response = conn.getresponse()
            response_json = response.read()
            conn.close()
            logger.info("Box setting result: " + str(response_json))
            is_access_cloud = True
        else:
            is_access_cloud = False
        return self.write({'code': 0, 'msg': 'Success', 'data': {'is_access_cloud': is_access_cloud}})
network_api.py 文件源码 项目:Mohou_Box-master 作者: mohou 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_serial_number():
    url = "http://127.0.0.1:5000/profile2"
    cur_client = HTTPClient()
    response = cur_client.fetch(url, request_timeout=10)
    if response.error:
        return None
    res = json_decode(response.body)
    if res["code"] != 0:
        return None
    return res["data"]['boxid']
bangumi_bot.py 文件源码 项目:bangumi_calendar 作者: coldfog 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def get_data(self):
        content = yield httpclient.AsyncHTTPClient().fetch(self.url)
        # content = httpclient.HTTPClient().fetch(self.url)
        if not content.error:
            root = html.fromstring(content.body.decode('utf-8'))
        else:
            raise gen.Return(None)

        bangumi_info = []
        for i in range(1, 8):
            weekday = i
            if weekday == 7:
                weekday = 0

            for e in root.xpath('//*[@id="tab_100895_%d"]//*[@class="v-meta-title"]/a' % i):
                title, hour, minute = self.TIME_PATTERN.match(e.text).groups()
                update_time = datetime.time(int(hour), int(minute))

                info = {'weekday': weekday,
                        'url': e.attrib['href'],
                        'title': title,
                        'update_time': update_time}

                bangumi_info.append(info)

        raise gen.Return(bangumi_info)
bangumi_bot.py 文件源码 项目:bangumi_calendar 作者: coldfog 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def get_data(self):
        config = {
            'proxy_host': '124.88.67.32',
            'proxy_port': 843
        }
        content = yield httpclient.AsyncHTTPClient().fetch(self.url, **config)
        # content = httpclient.HTTPClient().fetch(self.url)
        if not content.error:
            root = html.fromstring(content.body.decode('utf-8'))
        else:
            # return None
            raise gen.Return(None)

        bangumi_info = []
        # weekday = -1
        for e in root.xpath('//*[@id="scrollContent-day_update"]//*[@class="week-updateList_each"]'):
            weekday = self.WEEKDAY[e.xpath('./div/span')[0].text]
            for record in e.xpath('.//li'):
                url = record.xpath('./a')[0].attrib['href']
                title = record.xpath('./a/div/div[@class="week-cont_title"]')[0].text

                info = {'weekday': weekday,
                        'url': url,
                        'title': title,
                        'update_time': None}

                bangumi_info.append(info)
        raise gen.Return(bangumi_info)
httpclient_test.py 文件源码 项目:get_started_with_respeaker 作者: respeaker 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _clear_headers_for_304(self):
        # Tornado strips content-length from 304 responses, but here we
        # want to simulate servers that include the headers anyway.
        pass


# These tests end up getting run redundantly: once here with the default
# HTTPClient implementation, and then again in each implementation's own
# test suite.
httpclient_test.py 文件源码 项目:get_started_with_respeaker 作者: respeaker 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url('/notfound'))
        self.assertEqual(assertion.exception.code, 404)
main01.py 文件源码 项目:tornado-asyn-tutorial 作者: zhyq0826 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def synchronous_fetch(url, callback):
    http_client = HTTPClient()

    def handle_response(response):
        callback(response)
    http_client.fetch(url, callback=handle_response)


# ????
transport.py 文件源码 项目:python-zeep 作者: mvantellingen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_remote_data(self, url):
        client = httpclient.HTTPClient()
        kwargs = {
            'method': 'GET',
            'request_timeout': self.load_timeout
        }
        http_req = httpclient.HTTPRequest(url, **kwargs)
        response = client.fetch(http_req)
        return response.body
httpclient_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url('/notfound'))
        self.assertEqual(assertion.exception.code, 404)


问题


面经


文章

微信
公众号

扫码关注公众号