python类add_header()的实例源码

feedparser.py 文件源码 项目:MIT-6.0001-Problem-sets-solution 作者: cantell 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
test_urllib2net.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
addon_updater.py 文件源码 项目:blender-addon-updater 作者: CGCookie 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_raw(self, url):
        # print("Raw request:", url)
        request = urllib.request.Request(url)

        # setup private request headers if appropriate
        if self._engine.token != None:
            if self._engine.name == "gitlab":
                request.add_header('PRIVATE-TOKEN',self._engine.token)
            else:
                if self._verbose: print("Tokens not setup for engine yet")

        # run the request
        try:
            result = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self._error = "HTTP error"
            self._error_msg = str(e.code)
            self._update_ready = None
        except urllib.error.URLError as e:
            self._error = "URL error, check internet connection"
            self._error_msg = str(e.reason)
            self._update_ready = None
            return None
        else:
            result_string = result.read()
            result.close()
            return result_string.decode()


    # result of all api calls, decoded into json format
advanced_link_crawler.py 文件源码 项目:wswp 作者: kjam 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
advanced_link_crawler.py 文件源码 项目:wswp 作者: kjam 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
fetch_free_proxyes.py 文件源码 项目:Web-Scraping 作者: Martian2Lee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_html(url):
    request = urllib.request.Request(url)
    request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
    html = urllib.request.urlopen(request)
    return html.read()
fetch_free_proxyes.py 文件源码 项目:Web-Scraping 作者: Martian2Lee 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_html(url):
    request = urllib.request.Request(url)
    request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
    html = urllib.request.urlopen(request)
    return html.read()
test_urllib2net.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
http.py 文件源码 项目:machine-learning-python 作者: pspxiaochen 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, basestring):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if accept_header:
        request.add_header('Accept', accept_header)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in request_headers.items():
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
http.py 文件源码 项目:machine-learning-python 作者: pspxiaochen 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, basestring):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if accept_header:
        request.add_header('Accept', accept_header)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in request_headers.items():
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
advanced_link_crawler.py 文件源码 项目:Python-Web-Scraping-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
advanced_link_crawler.py 文件源码 项目:Python-Web-Scraping-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
testrail.py 文件源码 项目:robotframework-testrail 作者: ATEME 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __send_request(self, method, uri, data):
        url = self.__url + uri
        request = urllib.request.Request(url)
        if (method == 'POST'):
            request.data = bytes(json.dumps(data), 'utf-8')
        auth = str(
            base64.b64encode(
                bytes('%s:%s' % (self.user, self.password), 'utf-8')
            ),
            'ascii'
        ).strip()
        request.add_header('Authorization', 'Basic %s' % auth)
        request.add_header('Content-Type', 'application/json')

        e = None
        try:
            response = urllib.request.urlopen(request).read()
        except urllib.error.HTTPError as ex:
            response = ex.read()
            e = ex

        if response:
            result = json.loads(response.decode())
        else:
            result = {}

        if e != None:
            if result and 'error' in result:
                error = '"' + result['error'] + '"'
            else:
                error = 'No additional error message received'
            raise APIError('TestRail API returned HTTP %s (%s)' % 
                (e.code, error))

        return result
googl.py 文件源码 项目:telegram-urlprobot 作者: GabrielRF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __call(self, url=API_URL, params={}, data=None, headers={}):
        """Common method for API call.

        url: API URL
        params: query string parameters
        data: POST data
        headers: additional request headers

        Return: parsed JSON structure or raise GooglError.
        """
        params.update(key=self.key)
        if self.userip is not None:
            params.update(userip=self.userip)

        full_url = "%s?%s" % (url % self.api, urllib.parse.urlencode(params))

        request = urllib.request.Request(full_url, data=bytes(data, encoding="UTF-8"), headers=headers)

        if self.referer is not None:
            request.add_header("Referer", self.referer)
        if self.client_login is not None:
            request.add_header("Authorization", "GoogleLogin auth=%s" % self.client_login)

        try:
            response = urllib.request.urlopen(request)
            return json.loads(str(response.read(), encoding="UTF-8"))
        except urllib.error.HTTPError as e:
            error = json.loads(e.fp.read())
            raise GooglError(error["error"]["code"], error["error"]["message"])
__init__.py 文件源码 项目:mensahd 作者: cvzi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _getMealsURL():
    """Download meals information from XML feed"""
    request = urllib.request.Request(mealsURL)
    request.add_header("Authorization", "Basic %s" % mealsURL_authorization)
    result = urllib.request.urlopen(request, timeout=__timeoutSeconds)
    return result, 0
arachni.py 文件源码 项目:PassiveScanner 作者: jjf012 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post_api(self, api_path):
        options = json.dumps(self.options)
        options = options if isinstance(options, bytes) else options.encode('utf8')
        request = urllib.request.Request(self.arachni_url + api_path, options)
        request.add_header('Content-Type', 'application/json')
        return urllib.request.urlopen(request).read().decode('utf8')
addon_updater.py 文件源码 项目:blender-addon-updater 作者: CGCookie 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stage_repository(self, url):

        local = os.path.join(self._updater_path,"update_staging")
        error = None

        # make/clear the staging folder
        # ensure the folder is always "clean"
        if self._verbose: print("Preparing staging folder for download:\n",local)
        if os.path.isdir(local) == True:
            try:
                shutil.rmtree(local)
                os.makedirs(local)
            except:
                error = "failed to remove existing staging directory"
        else:
            try:
                os.makedirs(local)
            except:
                error = "failed to create staging directory"

        if error != None:
            if self._verbose: print("Error: Aborting update, "+error)
            self._error = "Update aborted, staging path error"
            self._error_msg = "Error: {}".format(error)
            return False

        if self._backup_current==True:
            self.create_backup()
        if self._verbose: print("Now retrieving the new source zip")

        self._source_zip = os.path.join(local,"source.zip")

        if self._verbose: print("Starting download update zip")
        try:
            request = urllib.request.Request(url)

            # setup private token if appropriate
            if self._engine.token != None:
                if self._engine.name == "gitlab":
                    request.add_header('PRIVATE-TOKEN',self._engine.token)
                else:
                    if self._verbose: print("Tokens not setup for selected engine yet")
            self.urlretrieve(urllib.request.urlopen(request), self._source_zip)
            # add additional checks on file size being non-zero
            if self._verbose: print("Successfully downloaded update zip")
            return True
        except Exception as e:
            self._error = "Error retrieving download, bad link?"
            self._error_msg = "Error: {}".format(e)
            if self._verbose:
                print("Error retrieving download, bad link?")
                print("Error: {}".format(e))
            return False


问题


面经


文章

微信
公众号

扫码关注公众号