python类request()的实例源码

exchange.py 文件源码 项目:Pancake-Cogs 作者: UltimatePancake 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def moneyconvert(self, amount: float, base: str, to: str):
        """Currency converter
        Set the amount, currency FROM (base) and currency TO 
        Available currencies for conversion:
        AUD BGN BRL CAD CHF CNY CZK DKK EUR GBP HKD HRK HUF IDR ILS INR JPY KRW MXN MYR NOK NZD PHP PLN RON RUB SEK SGD THB TRY USD ZAR
        ***WARNING***
        Conversion may not be exact"""
        if base.upper() not in self.currencies or to.upper() not in self.currencies:
            await self.bot.say('One or both of the currencies selected are invalid')
        else:
            api = 'http://api.fixer.io/latest?base={}'.format(base.upper())
            async with aiohttp.request("GET", api) as r:
                result = await r.json()
                rate = result['rates'][to.upper()]
                converted_amount = amount * rate
                pre_conv = '{0:.2f}'.format(amount)
                post_conv = '{0:.2f}'.format(converted_amount)
                await self.bot.say('`{} {} = {} {}`'.format(base.upper(), pre_conv, to.upper(), post_conv))
china_bond.py 文件源码 项目:chinabond 作者: muxuezi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_async(ym, x):
    '''
    ??GET????xls????
    parameters
    ==========
    ym : str
        ??
    x : dict
        ???????
    '''
    with (await sem):
        name, sid = x['name'], x['sid']
        url = 'http://www.chinabond.com.cn/DownLoadxlsx?sId={}&sBbly={}&sMimeType=4'.format(
            sid, ym)
        outfilename = 'data/{}{}{}.xls'.format(sid, name, ym)
        logging.info('downloading %s', outfilename)
        response = await aiohttp.request('GET', url, headers=headers)
        with closing(response), open(outfilename, 'wb') as file:
            while True:  # ????
                chunk = await response.content.read(1024)
                if not chunk:
                    break
                file.write(chunk)
        logging.info('done %s', outfilename)
bus.py 文件源码 项目:trellio 作者: artificilabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def send_http_request(self, app: str, service: str, version: str, method: str, entity: str, params: dict):
        """
        A convenience method that allows you to send a well formatted http request to another service
        """
        host, port, node_id, service_type = self._registry_client.resolve(service, version, entity, HTTP)

        url = 'http://{}:{}{}'.format(host, port, params.pop('path'))

        http_keys = ['data', 'headers', 'cookies', 'auth', 'allow_redirects', 'compress', 'chunked']
        kwargs = {k: params[k] for k in http_keys if k in params}

        query_params = params.pop('params', {})

        if app is not None:
            query_params['app'] = app

        query_params['version'] = version
        query_params['service'] = service

        response = yield from aiohttp.request(method, url, params=query_params, **kwargs)
        return response
bus.py 文件源码 项目:trellio 作者: artificilabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _request_sender(self, packet: dict):
        """
        Sends a request to a server from a ServiceClient
        auto dispatch method called from self.send()
        """
        node_id = self._get_node_id_for_packet(packet)
        client_protocol = self._client_protocols.get(node_id)
        if node_id and client_protocol:
            if client_protocol.is_connected():
                packet['to'] = node_id
                client_protocol.send(packet)
                return True
            else:
                self._logger.error('Client protocol is not connected for packet %s', packet)
                raise ClientDisconnected()
        else:
            # No node found to send request
            self._logger.error('Out of %s, Client Not found for packet %s, restarting server...',
                               self._client_protocols.keys(), packet)
            raise ClientNotFoundError()
flags2_asyncio_executor.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
flags3_asyncio.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json()  # <1>
        else:
            data = yield from res.read()  # <2>
        return data

    elif res.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.errors.HttpProcessingError(
            code=res.status, message=res.reason,
            headers=res.headers)
use_proxy.py 文件源码 项目:utils 作者: Ctrlsman 项目源码 文件源码 阅读 120 收藏 0 点赞 0 评论 0
def do_request():
    proxy_url = 'http://198.23.237.213:1080'  # your proxy address
    response = yield from aiohttp.request(
        'GET', 'http://google.com',
        proxy=proxy_url,
    )
    return response
hubot.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _send_to_external_chat(self, bot, event, config):
        if event.from_bot:
            # don't send my own messages
            return

        conversation_id = event.conv_id
        conversation_text = event.text

        user_id = event.user_id

        url = config["HUBOT_URL"] + conversation_id
        payload = {"from" : str(user_id.chat_id), "message" : conversation_text}
        headers = {'content-type': 'application/json'}

        connector = aiohttp.TCPConnector(verify_ssl=False)
        asyncio.ensure_future(
            aiohttp.request('post', url, data=json.dumps(payload),
                            headers=headers, connector=connector)
        )
horoscope.py 文件源码 项目:ax-cogs 作者: Aioxas 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _cookie(self):
        """Retrieves a random fortune cookie fortune."""
        regex = ["class=\"cookie-link\">([^`]*?)<\/a>", "<p>([^`]*?)<\/p>",
                 "(?:\\\\['])", "<strong>([^`]*?)<\/strong>",
                 "<\/strong><\/a>([^`]*?)<br>",
                 "3\)<\/strong><\/a>([^`]*?)<\/div>"]
        url = "http://www.fortunecookiemessage.com"
        await self.file_check()
        async with aiohttp.request("GET", url, headers={"encoding": "utf-8"}) as resp:
            test = str(await resp.text())
            fortune = re.findall(regex[0], test)
            fortest = re.match("<p>", fortune[0])
            if fortest is not None:
                fortune = re.findall(regex[1], fortune[0])
            title = re.findall(regex[3], test)
            info = re.findall(regex[4], test)
            info[0] = html.unescape(info[0])
            dailynum = re.findall(regex[5], test)
            self.fortune_process(fortune[0])
            await self.bot.say("Your fortune is:")
            await self.bot.upload("data/horoscope/cookie-edit.png")
            await self.bot.say("\n" + title[1] +
                               info[1] + "\n" + title[2] + dailynum[0])
            os.remove("data/horoscope/cookie-edit.png")
horoscope.py 文件源码 项目:ax-cogs 作者: Aioxas 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _font(self, url: str=None):
        """Allows you to set the font that the fortune cookies are shown in.
           Only accepts ttf."""
        option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)'
                  ' Gecko/20100101 Firefox/40.1'}

        if url is None :
            url = "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"
            if os.is_file("data/horoscope/FortuneCookieNF.ttf"):
                return
            else:
                async with aiohttp.request("GET", url, headers=option) as resp:
                    test = await resp.read()
                    with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f:
                        f.write(test)
        elif not url.endswith("ttf"):
            await self.bot.say("This is not a .ttf font, please use a .ttf font. Thanks")
            return
        await self.bot.say("Font has been saved")
strawpoll.py 文件源码 项目:ax-cogs 作者: Aioxas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _strawpoll(self, ctx, *, question, options=None):
        """Makes a poll based on questions and choices or options. must be divided by "; "
            Examples:
            [p]strawpoll What is this person?; Who is this person?; Where is this person?; When is this person coming?
            [p]strawpoll What; Who?; Where?; When?; Why?"""
        options_list = question.split('; ')
        title = options_list[0]
        options_list.remove(title)
        if len(options_list) < 2:
            await self.bot.say("You need to specify 2 or more options")
        else:
            normal = {"title": title, "options": options_list}
            request = dict(normal, **self.settings)
            async with aiohttp.request('POST', 'http://strawpoll.me/api/v2/polls',
                                       headers={'content-type': 'application/json'},
                                       data=json.dumps(request)) as resp:
                test = await resp.content.read()
                test = json.loads(test.decode())
                sid = test["id"]
                await self.bot.say("Here's your strawpoll link: http://strawpoll.me/{}".format(sid))
utils_proxy.py 文件源码 项目:cockatiel 作者: raphaelm 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_request(self, message, payload):
        url = message.path

        logger.info('{0} {1}'.format(message.method, url))

        if message.method in ('POST', 'PUT', 'PATCH'):
            data = yield from payload.read()
        else:
            data = None

        message, data = self.intercept_request(message, data)
        if not message:
            return

        response = yield from aiohttp.request(message.method, url, headers=message.headers,
                                              data=data)
        response_content = yield from response.content.read()

        response, response_content = self.intercept_response(response, response_content)
        yield from self.response_to_proxy_response(response, response_content)
flags2_asyncio_executor.py 文件源码 项目:Books_SourceCode 作者: activeion 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
flags2_asyncio_executor.py 文件源码 项目:Books_SourceCode 作者: activeion 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
checker3.py 文件源码 项目:repo-checker 作者: 1dot75cm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_page(self, _url):
        ''' ????????
        return str '''

        header = { 'Accept-Encoding': 'gzip' }
        header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
        if opts['user_agent']: header['User-Agent'] = opts['user_agent']

        with (yield from semaphore):
            response = yield from aiohttp.request('GET', _url, headers = header)
            page = yield from response.read()

        try:
            if self.url_type == "2": return "None Content"
            if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
            else:                    return gzip.decompress(page)
        except OSError:
            return page
BaseRequest.py 文件源码 项目:PairApi 作者: Louis-me 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get(self, url, param):
        data = {}
        _url = self.req["protocol"] + self.req["host"] + ":" + str(self.req["port"]) + url
        print(_url +" get?????:"+str(param))
        try:
            response = yield from aiohttp.request("GET", _url, headers=self.req["header"], params=param)
            string = (yield from response.read()).decode('utf-8')
            if response.status == 200:
                data = json.loads(string)
            else:
                print("data fetch failed for")
                print(response.content, response.status)
            data["status_code"] = response.status
            print(data)
        except asyncio.TimeoutError:
            print("????")
        except UnicodeDecodeError:
            print("?????")
        return data
BaseRequest.py 文件源码 项目:PairApi 作者: Louis-me 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post(self,url, param):
        data = {}
        _url = self.req["protocol"] + self.req["host"] + ':' + str(self.req["port"]) + url
        print(_url + " post?????:" + str(param))
        requests.post(_url,files=None, data=json.dumps(param),  headers=self.req["header"])
        response = yield from aiohttp.request('POST', _url, data=json.dumps(param), headers=self.req["header"])
        string = (yield from response.read()).decode('utf-8')
        if response.status == 200:
            data = json.loads(string)
        else:
            print("data fetch failed for")
            print(response.content, response.status)
        data["status_code"] = response.status
        print(data)

        return data
pinger.py 文件源码 项目:trellio 作者: artificilabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ping_coroutine(self, payload=None):
        try:
            res = yield from request('get', self._url)
            if res.status == 200:
                self.pong_received(payload=payload)
                res.close()
        except Exception:
            self.logger.exception('Error while ping')
_test_asyncio.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_aiohttp(self):
        try:
            import aiohttp
        except ImportError:
            raise SkipTest("Requires aiohttp")
        from aiohttp import web

        zmq.asyncio.install()

        @asyncio.coroutine
        def echo(request):
            print(request.path)
            return web.Response(body=str(request).encode('utf8'))

        @asyncio.coroutine
        def server(loop):
            app = web.Application(loop=loop)
            app.router.add_route('GET', '/', echo)

            srv = yield from loop.create_server(app.make_handler(),
                                                '127.0.0.1', 8080)
            print("Server started at http://127.0.0.1:8080")
            return srv

        @asyncio.coroutine
        def client():
            push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL)

            res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/')
            text = yield from res.text()
            yield from push.send(text.encode('utf8'))
            rcvd = yield from pull.recv()
            self.assertEqual(rcvd.decode('utf8'), text)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(server(loop))
        print("servered")
        loop.run_until_complete(client())
observer_async.py 文件源码 项目:Software-Architecture-with-Python 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def callback(self, channel, data):
        """ Callback method """

        # The data is a URL
        url = data
        # We return the response immediately
        print('Fetching URL',url,'...')
        future = aiohttp.request('GET', url)
        self.futures.append(future)

        return future
get_all_aiohttp.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fetch (url):
    r = yield from aiohttp.request ('GET', url)
    print ('%s %s' % (r.status, r.reason), end = " ")
    return (yield from r.read ())
get_all_static_aiohttp.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fetch (url):
    r = yield from aiohttp.request ('GET', url)
    print ('%s %s' % (r.status, r.reason), end = " ")
    return (yield from r.read ())
flags2_asyncio.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_flag(base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)
flags2_await.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_flag(base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    with closing(await aiohttp.request('GET', url)) as resp:
        if resp.status == 200:
            image = await resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)
flags_asyncio2.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    res = yield from aiohttp.request('GET', url)
    image = yield from res.read()
    return image
http_cli0.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download(url):
    response = yield from aiohttp.request('GET', url)
    for k, v in response.items():
        print('{}: {}'.format(k, v[:80]))

    data = yield from response.read()
    print('\nReceived {} bytes.\n'.format(len(data)))
aio.py 文件源码 项目:pushka 作者: rudyryk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _request(self, url_, method_, params=None, data=None, **kwargs):
        """Perform request via `aiohttp.request`_.

        .. _aiohttp.request: http://aiohttp.readthedocs.org/en/v0.9.2/client.html#make-a-request
        """
        response = yield from aiohttp.request(method_.lower(), url_,
                                              params=params, data=data, loop=self._loop,
                                              **utils.norm_aiohttp_kwargs(**kwargs))
        return {
            'code': response.status,
            'body': (yield from response.text()),
            'error': None, # TODO: how errors statues are described in aiohttp?
        }
forwarding.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _handle_forwarding(bot, event, command):
    """Handle message forwarding"""
    # Test if message forwarding is enabled
    if not bot.get_config_suboption(event.conv_id, 'forwarding_enabled'):
        return

    forward_to_list = bot.get_config_suboption(event.conv_id, 'forward_to')
    if forward_to_list:
        logger.debug("{}".format(forward_to_list))

        for _conv_id in forward_to_list:
            html_identity = "<b><a href='https://plus.google.com/u/0/{}/about'>{}</a></b><b>:</b> ".format(event.user_id.chat_id, event.user.full_name)

            html_message = event.text

            if not event.conv_event.attachments:
                yield from bot.coro_send_message( _conv_id,
                                                  html_identity + html_message )

            for link in event.conv_event.attachments:

                filename = os.path.basename(link)
                r = yield from aiohttp.request('get', link)
                raw = yield from r.read()
                image_data = io.BytesIO(raw)
                image_id = None

                try:
                    image_id = yield from bot._client.upload_image(image_data, filename=filename)
                    if not html_message:
                        html_message = "(sent an image)"
                    yield from bot.coro_send_message( _conv_id,
                                                      html_identity + html_message,
                                                      image_id=image_id )

                except AttributeError:
                    yield from bot.coro_send_message( _conv_id,
                                                      html_identity + html_message + " " + link )
autoreply.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def image_upload_single(image_uri, bot):
    logger.info("getting {}".format(image_uri))
    filename = os.path.basename(image_uri)
    r = yield from aiohttp.request('get', image_uri)
    raw = yield from r.read()
    image_data = io.BytesIO(raw)
    image_id = yield from bot._client.upload_image(image_data, filename=filename)
    return image_id
aiohttp_server.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle(request):
    coroutines = [aiohttp.request('get', url) for url in REQEUST_URLS]

    results = await asyncio.gather(*coroutines, return_exceptions=True)

    response_data = {
        url: not isinstance(result, Exception) and result.status == 200
        for url, result in zip(REQEUST_URLS, results)
    }

    body = json.dumps(response_data).encode('utf-8')
    return web.Response(body=body, content_type="application/json")


问题


面经


文章

微信
公众号

扫码关注公众号