python类add_header()的实例源码

plugin.py 文件源码 项目:domoticz-plugins 作者: milanvo 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def _try_send(self, ip, port, body, header, data):
        try:
            request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port))
            request.add_header('Content-type', 'text/xml; charset="utf-8"')
            request.add_header('SOAPACTION', header)
            request_body = '<?xml version="1.0" encoding="utf-8"?>'
            request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
            request_body += '<s:Body>%s</s:Body></s:Envelope>' % body
            request.data = request_body.encode()
            result = urllib.request.urlopen(request, timeout=3)
            return self._extract(result.read().decode(), data)
        except Exception as e:
#        except:
#            raise
            print(str(e))
            return None
steamgifts.py 文件源码 项目:AutoSteamGifts 作者: joaopsys 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def getWebPage(url, headers, cookies, postData=None):
    try:
        if (postData):
            params = urllib.parse.urlencode(postData)
            params = params.encode('utf-8')
            request = urllib.request.Request(url, data=params, headers=headers)
        else:
            print('Fetching '+url)
            request = urllib.request.Request(url, None, headers)
        request.add_header('Cookie', cookies)
        if (postData):
            response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
        else:
            response = urllib.request.urlopen(request)
        if response.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            r = f.read()
        else:
            r = response.read()

        return r
    except Exception as e:
        print("Error processing webpage: "+str(e))
        return None

## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
naverTTS.py 文件源码 项目:googleSpeech_with_NaverTTS 作者: chandong83 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def play(self, txt):
        encText = urllib.parse.quote(txt)
        data = "speaker=" + self.speaker + "&speed=" + self.speed + "&text=" + encText;

        request = urllib.request.Request(url)
        request.add_header("X-Naver-Client-Id",client_id)
        request.add_header("X-Naver-Client-Secret",client_secret)
        response = urllib.request.urlopen(request, data=data.encode('utf-8'))
        rescode = response.getcode()
        if(rescode==200):
            response_body = response.read()
            with open(tmpPlayPath, 'wb') as f:
                f.write(response_body)

            #?? ???? ?? vlc
            os.system('cvlc ' + tmpPlayPath + ' --play-and-exit')
            #??????
            #os.system('omxplayer ' + tmpPlayPath)
        else:
            print("Error Code:" + rescode)
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def authenticate_with_apikey(self, api_key, scope=None):
        """perform authentication by api key and store result for execute_request method
        api_key -- secret api key from account settings
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "client_credentials",
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def authenticate_with_contact_credentials(self, username, password, scope=None):
        """perform authentication by contact credentials and store result for execute_request method
        username -- typically a contact email
        password -- contact password
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "password",
            "username": username,
            "password": password,
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
generate_static.py 文件源码 项目:pygameweb 作者: pygame 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def downloadUrls(self, urls):
        url_data = {}
        for u in urls:
            url = self.base_url + u

            request = urllib.request.Request(url)


            # the .htaccess file checks for the header, and if it exists returns unprocessed data.
            request.add_header('User-agent', 'our-web-crawler')
            try:
                response = urllib.request.urlopen(request)
                data = response.read()
            except urllib.request.HTTPError:
                log (url)
                raise
            except urllib.request.URLError:
                log (url)
                raise
            yield (u,data)
sitemap_crawler.py 文件源码 项目:wswp 作者: kjam 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
link_crawler.py 文件源码 项目:wswp 作者: kjam 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
id_iteration_crawler.py 文件源码 项目:wswp 作者: kjam 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
sitemap_crawler.py 文件源码 项目:Python-Web-Scraping-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
link_crawler.py 文件源码 项目:Python-Web-Scraping-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
id_iteration_crawler.py 文件源码 项目:Python-Web-Scraping-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
seamicro.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def put(self, location, params=None):
        """Dispatch a PUT request to a SeaMicro chassis.

        The seamicro box has order-dependent HTTP parameters, so we build
        our own get URL, and use a list vs. a dict for data, as the order is
        implicit.
        """
        opener = urllib.request.build_opener(urllib.request.HTTPHandler)
        url = self.build_url(location, params)
        request = urllib.request.Request(url)
        request.get_method = lambda: 'PUT'
        request.add_header('content-type', 'text/json')
        response = opener.open(request)
        json_data = self.parse_response(url, response)

        return json_data['result']
mando.me.py 文件源码 项目:mando.me 作者: z0noxz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send(url, headers, timeout):
        request = urllib.request.Request(url)
        data = ""

        # Add password key
        request.add_header(_gs["smplshll_main_password_var"], _gs["smplshll_main_password"])

        for header in headers.keys():
            request.add_header(header, Utility.crypt(_gs["smplshll_input_password"], headers[header]))

        if timeout > 0:
            data = urllib.request.urlopen(request, timeout = timeout).read()
        else:
            data = urllib.request.urlopen(request).read()

        return Utility.crypt(_gs["smplshll_input_password"], data, False) if _gs["smplshll_response_encryption"] else data
remote_service.py 文件源码 项目:gemstone 作者: vladcalin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def build_http_request_obj(self, request_body):
        request = urllib.request.Request(self.url)
        request.add_header("Content-Type", "application/json")
        request.add_header("User-Agent", "gemstone-client")
        request.data = json.dumps(request_body).encode()
        request.method = "POST"
        return request
access.py 文件源码 项目:python-fritzbox 作者: pamapa 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post(self, path, headers, body):
    uri = "%s/%s" % (self.url.geturl(), path)
    if self.debug: print("post: uri=%s, headers=%s" % (uri, headers))
    request = urllib2.Request(uri)
    for header in headers:
      request.add_header(header, headers[header])
    request.add_data(body)
    resp = urllib2.urlopen(request, cafile=self.cafile)
    if self.debug: print("resp: %s" % resp.info())
    return resp
feedparser.py 文件源码 项目:SublimeRSS 作者: JaredMHall 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
bing.py 文件源码 项目:bing 作者: thatnight 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def open_url(url):
    request = urllib.request.Request(url)
    request.add_header('User-Agent',
                       'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.3357.400 QQBrowser/9.6.11858.400')
    response = urllib.request.urlopen(request)
    return response.read()


# download image
crawler.py 文件源码 项目:google-crawler 作者: xibowang 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def fetch_page(query):
    url = REQUEST_URL.format(BASE_URL, query)
    request = urllib.request.Request(url)
    request.add_header('User-agent', _random_user_agent())
    request.add_header('connection', 'keep-alive')
    request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
    request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
    print(url)
    response = urllib.request.urlopen(request)

    data = response.read()
    print(type(data))
    return gzip.decompress(data)
core.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upload_image(self, image_uri, sync, username, userid, channel_name):
        token = self.apikey
        logger.info('downloading %s', image_uri)
        filename = os.path.basename(image_uri)
        request = urllib.request.Request(image_uri)
        request.add_header("Authorization", "Bearer %s" % token)
        image_response = urllib.request.urlopen(request)
        content_type = image_response.info().get_content_type()

        filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
        physical_extension = "." + filename.rsplit(".", 1).pop().lower()

        if physical_extension == filename_extension:
            pass
        elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
            # account for mimetypes idiosyncrancy to return jpe for valid jpeg
            pass
        else:
            logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
            filename += filename_extension

        logger.info('uploading as %s', filename)
        image_id = yield from self.bot._client.upload_image(image_response, filename=filename)

        logger.info('sending HO message, image_id: %s', image_id)
        yield from sync._bridgeinstance._send_to_internal_chat(
            sync.hangoutid,
            "shared media from slack",
            {   "sync": sync,
                "source_user": username,
                "source_uid": userid,
                "source_title": channel_name },
            image_id=image_id )
test_urllib2net.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
weixin.py 文件源码 项目:WeixinBot 作者: Urinx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get(self, url: object, api: object = None, timeout: object = None) -> object:
        request = urllib.request.Request(url=url)
        request.add_header('Referer', 'https://wx.qq.com/')
        if api == 'webwxgetvoice':
            request.add_header('Range', 'bytes=0-')
        if api == 'webwxgetvideo':
            request.add_header('Range', 'bytes=0-')
        try:
            response = urllib.request.urlopen(request, timeout=timeout) if timeout else urllib.request.urlopen(request)
            if api == 'webwxgetvoice' or api == 'webwxgetvideo':
                data = response.read()
            else:
                data = response.read().decode('utf-8')
            logging.debug(url)
            return data
        except urllib.error.HTTPError as e:
            logging.error('HTTPError = ' + str(e.code))
        except urllib.error.URLError as e:
            logging.error('URLError = ' + str(e.reason))
        except http.client.HTTPException as e:
            logging.error('HTTPException')
        except timeout_error as e:
            pass
        except ssl.CertificateError as e:
            pass
        except Exception:
            import traceback
            logging.error('generic exception: ' + traceback.format_exc())
        return ''
weixin.py 文件源码 项目:WeixinBot 作者: Urinx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _post(self, url: object, params: object, jsonfmt: object = True) -> object:
        if jsonfmt:
            data = (json.dumps(params)).encode()

            request = urllib.request.Request(url=url, data=data)
            request.add_header(
                'ContentType', 'application/json; charset=UTF-8')
        else:
            request = urllib.request.Request(url=url, data=urllib.parse.urlencode(params).encode(encoding='utf-8'))


        try:
            response = urllib.request.urlopen(request)
            data = response.read()
            if jsonfmt:
                return json.loads(data.decode('utf-8') )#object_hook=_decode_dict)
            return data
        except urllib.error.HTTPError as e:
            logging.error('HTTPError = ' + str(e.code))
        except urllib.error.URLError as e:
            logging.error('URLError = ' + str(e.reason))
        except http.client.HTTPException as e:
            logging.error('HTTPException')
        except Exception:
            import traceback
            logging.error('generic exception: ' + traceback.format_exc())

        return ''
test_urllib2net.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
plugin.py 文件源码 项目:domoticz 作者: ericstaal 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def volgendeZonondergang(self):
    # sunrise from domoticz... But I don't know how to retrieve it....

    try:
      domoticzurl = 'https://127.0.0.1:8443/json.htm?type=command&param=getSunRiseSet'
      encoding = 'utf-8'

      inlog = '%s:%s' % (self.domoticzusername, self.domoticzpassword) 
      base64string = base64.b64encode(inlog.encode(encoding)).decode(encoding)
      request = urllib.request.Request(domoticzurl)
      request.add_header("Authorization", "Basic %s" % base64string)
      response = urllib.request.urlopen(request)
      data = response.read()

      JSON_object = json.loads(data.decode(encoding))
      time = JSON_object['Sunset'].split(':')
      now = datetime.now()
      ret = datetime(now.year, now.month, now.day, int(time[0]), int(time[1]), 0)
      # when started after sunset use 'now'
      now = now + timedelta(minutes = int(Parameters["Mode4"])) 
      if (now > ret):
        ret = ret + timedelta(days = 1) 
      return ret
    except Exception as e:
      self.LogError("Error retrieving Sunset: "+ str(e))
      now = datetime.now()
      return datetime(now.year, now.month, now.day, 22, 0, 0)
doorbell.py 文件源码 项目:domoticz 作者: ericstaal 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def domoticzrequest (url):
  request = urllib.request.Request(url)
  request.add_header("Authorization", "Basic %s" % base64string)
  response = urllib.request.urlopen(request)
  return response.read().decode('utf-8')
checkonline.py 文件源码 项目:domoticz 作者: ericstaal 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def domoticzrequest (url):
  request = urllib.request.Request(url)
  request.add_header("Authorization", "Basic %s" % base64string)
  response = urllib.request.urlopen(request)
  return response.read().decode('utf-8')
feedparser.py 文件源码 项目:textnews 作者: qznc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def execute_request(self, api_url, api_request_object=None, method=None):
        """
        perform api request and return result as an instance of ApiObject or list of ApiObjects
        api_url -- absolute or relative api resource url
        api_request_object -- any json serializable object to send to API
        method -- HTTP method of api request. Default: GET if api_request_object is None else POST
        """
        if self._token is None:
            raise ApiException("Access token is not abtained. "
                               "Call authenticate_with_apikey or authenticate_with_contact_credentials first.")

        if not api_url.startswith("http"):
            api_url = self.api_endpoint + api_url

        if method is None:
            if api_request_object is None:
                method = "GET"
            else:
                method = "POST"

        request = urllib.request.Request(api_url, method=method)
        if api_request_object is not None:
            request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()

        request.add_header("Content-Type", "application/json")
        request.add_header("Accept", "application/json")
        request.add_header("Authorization", "Bearer " + self._get_access_token())

        try:
            response = urllib.request.urlopen(request)
            return WaApiClient._parse_response(response)
        except urllib.error.HTTPError as httpErr:
            if httpErr.code == 400:
                raise ApiException(httpErr.read())
            else:
                raise
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _refresh_auth_token(self):
        data = {
            "grant_type": "refresh_token",
            "refresh_token": self._token.refresh_token
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()


问题


面经


文章

微信
公众号

扫码关注公众号