python类head()的实例源码

utils.py 文件源码 项目:faceless 作者: LiuRoy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def qiniu_upload_img(img_url):
    """?????????

    Args:
        img_url (string): ????
    """
    response = requests.get(img_url)
    image = response.content
    md5 = calc_md5(image)

    qiniu_url = 'http://{}/{}'.format(QINIU_HOSTNAME, md5)
    if requests.head(qiniu_url).ok:
        return qiniu_url

    q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
    token = q.upload_token(QINIU_BUCKET, md5, 10)
    put_data(token, md5, image, mime_type='image/jpeg')
    return qiniu_url
cadastra_recurso_servidores.py 文件源码 项目:scripts-dadosgovbr 作者: dadosgovbr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def register(self):
        u"Register resource into CKAN"
        import ckanapi
        ckansite = ckanapi.RemoteCKAN(self.ckan_url, apikey=self.apikey)

        # resource url responds?
        resource = requests.head(self.url)
        self.size = int(resource.headers["content-length"])

        # resource exists?
        resources = ckansite.action.resource_search(query=u"url:%s" % self.url)
        if resources[u"count"] == 0:
            ckansite.action.resource_create(
                package_id = self.package_id,
                url = self.url,
                name = self.name,
                description = self.description,
                format = self.format,
                mimetype = self.mimetype,
                size = self.size,
            )
app.py 文件源码 项目:opendxl-maxmind-service-python 作者: opendxl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _is_update_needed(self):
        """
        Determines if an update for the database is necessary

        :return: Whether or not the database should be updated
        """

        # Retrieve the headers of the response to compare the MD5 checksums of the file
        headers_response = requests.head(self.MAXMIND_FREE_DB_URL)
        logger.debug(headers_response.headers)
        headers_response.raise_for_status()
        response_checksum = headers_response.headers.get('X-Database-MD5')
        logger.info("Database MD5 received in response headers: " + str(response_checksum))
        # Compare the current file checksum to the one received from MaxMind
        if response_checksum is not None and response_checksum == self._database_checksum:
            return False
        self._response_checksum = response_checksum
        return True
manager.py 文件源码 项目:ucloud-storage-python-wrapper 作者: wishket 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def head_account_metadata(self):
        # get account's metadata
        url = self.url

        # request api
        response = requests.head(url, headers=self.base_headers)

        # formatting result
        result = dict()
        for key in response.headers:
            if key == 'X-Account-Container-Count':
                result['container_count'] = \
                    response.headers['X-Account-Container-Count']
            elif key == 'X-Account-Object-Count':
                result['object_count'] = \
                    response.headers['X-Account-Object-Count']
            elif key == 'X-Account-Bytes-Used':
                result['used_bytes'] = replace_bytes_to_readable(
                    response.headers['X-Account-Bytes-Used']
                )
            else:
                result[key] = response.headers[key]
        return result
manager.py 文件源码 项目:ucloud-storage-python-wrapper 作者: wishket 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def head_container_metadata(self, container_name):
        """
        get container's metadata
        :param container_name: target container name
        :return: container's metadata dict
        """
        # check container metadata
        url = self.url + '/' + container_name

        # request api
        response = requests.head(url, headers=self.base_headers)

        # formatting result
        result = dict()
        for key in response.headers:
            if key == 'X-Container-Object-Count':
                result['object_count'] = \
                    response.headers['X-Container-Object-Count']
            elif key == 'X-Container-Bytes-Used':
                result['used_bytes'] = replace_bytes_to_readable(
                    response.headers['X-Container-Bytes-Used']
                )
            else:
                result[key] = response.headers[key]
        return result
imageinspector.py 文件源码 项目:inspect-docker-image 作者: giantswarm 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_layer_size(self, layer_hash):
        """
        Attempt to return the size of the given layer
        """
        url = "{base_url}/v2/{name}/blobs/{layer_hash}".format(
              base_url=self.base_url,
              name=self.repository_name,
              layer_hash=layer_hash)
        headers = {}
        if self.token is not None:
            headers["Authorization"] = "Bearer %s" % self.token
        r = requests.head(url, headers=headers, allow_redirects=True, timeout=(3.05,5))
        r.raise_for_status()
        if "content-length" in r.headers:
            self.layer_sizes[layer_hash] = int(r.headers["content-length"])
        else:
            self.layer_sizes[layer_hash] = None
        return self.layer_sizes[layer_hash]
spct_downloader.py 文件源码 项目:specton 作者: somesortoferror 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getDLsize(self):
        debug_log("getDLsize called")
        it = QTreeWidgetItemIterator(self.tw)
        while it.value():
            item = it.value()
            url_test = item.data(0, dataURL)
            if url_test is not None:
                try:
                    r = requests.head(url_test)
                    r.raise_for_status()
                    try:
                        size = (int(r.headers['Content-Length']) / 1024) / 1024
                    except ValueError:
                        size = 0
                    if size > 0:
                        item.setText(2, "{} MiB".format(round(size, 2)))
                except requests.exceptions.HTTPError:
                    debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers))
                    item.setText(2, r.status_code)
                except requests.exceptions.RequestException as e:
                    item.setText(2, self.tr("Error"))
                    debug_log(e, logging.ERROR)
            it += 1
network.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def head(self):
        try:
            response = requests.head(self.url, timeout=self.request_timeout)
            self.res = response # assign the response object from requests to a property on the instance of HTTP class
            return response.headers
        except requests.exceptions.ConnectionError as ce:
            return False
        except Exception as e:
            if DEBUG_FLAG:
                sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
            raise e


    #------------------------------------------------------------------------------
    # [ post method ] (string)
    #  HTTP POST request for text
    #  returns text from the URL as a string
    #------------------------------------------------------------------------------
network.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def head(self):
        try:
            response = requests.head(self.url, timeout=self.request_timeout)
            self.res = response # assign the response object from requests to a property on the instance of HTTP class
            return response.headers
        except requests.exceptions.ConnectionError as ce:
            return False
        except Exception as e:
            if DEBUG_FLAG:
                sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
            raise e


    #------------------------------------------------------------------------------
    # [ post method ] (string)
    #  HTTP POST request for text
    #  returns text from the URL as a string
    #------------------------------------------------------------------------------
api.py 文件源码 项目:python-wp 作者: myles 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_wp_api_url(self, url):
        """
        Private function for finding the WP-API URL.

        Arguments
        ---------

        url : str
            WordPress instance URL.
        """
        resp = requests.head(url)

        # Search the Links for rel="https://api.w.org/".
        wp_api_rel = resp.links.get('https://api.w.org/')

        if wp_api_rel:
            return wp_api_rel['url']
        else:
            # TODO: Rasie a better exception to the rel doesn't exist.
            raise Exception
precache.py 文件源码 项目:precache_dev 作者: carlashley 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def already_cached(self, asset_url):
        '''Checks if an item is already cached. This is indicated in the
        headers of the file being checked.'''
        try:
            req = requests.head(asset_url, headers={'user-agent': self.user_agent})  # NOQA
            if req.headers.get('Content-Type') is not None:
                # Item is not in cache
                self.log.debug('Not in cache: %s' % asset_url)
                return False
            else:
                # Item is already cached
                self.log.info('Already in cache: %s' % asset_url)
                return True
        except:
            # In case there is an error, we should return false anyway as there
            # is no harm in re-downloading the item if it's already downloaded.
            return False
link.py 文件源码 项目:arisu 作者: Appleman1234 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def get_normal_title(self,url):
        return_message = ""
        head = requests.head(url)
        max_size = 5e6
        if 'content-length' in head.headers and  int(head.headers['content-length']) > max_size:
            return_message = "File too big for link preview\r\n"
        else:
            with eventlet.Timeout(60, False):
                response = requests.get(url,timeout=30)
                if response.status_code == 200:
                    if 'text/html' in response.headers['content-type']:
                            soup = BeautifulSoup(response.content,"lxml")
                            if soup.title is not None:
                                return_message += soup.title.string + "\r\n"
                    else:
                        return_message += response.headers['content-type'] + " Size: " + response.headers['content-length'] + "\r\n"
        return return_message
resolver.py 文件源码 项目:metatab 作者: Metatab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def find_decl_doc(self, name):


        raise IncludeError(name)

        import requests
        from requests.exceptions import InvalidSchema
        url = METATAB_ASSETS_URL + name + '.csv'
        try:
            # See if it exists online in the official repo
            r = requests.head(url, allow_redirects=False)
            if r.status_code == requests.codes.ok:
                return url

        except InvalidSchema:
            pass  # It's probably FTP
youtube.py 文件源码 项目:councillor-party 作者: carsonyl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_minutes_url(config, metadata):
    start_date = parse_timestamp_naively(metadata['start']).astimezone(get_tz(config))
    meeting_type = None
    for key, value in config.get('minutes_abbrs', {}).items():
        if key in metadata[config['minutes_abbrs_for']]:
            meeting_type = value
            break

    minutes_url = metadata.get('minutes_url')
    if minutes_url:
        requests.head(minutes_url).raise_for_status()
        return minutes_url
    elif config['id'] == 'vancouver':
        if not meeting_type:
            return 'N/A'
        if metadata['title'] == 'Inaugural Council Meeting':
            meeting_type = 'inau'
        mins = 'http://council.vancouver.ca/{dt:%Y%m%d}/{type}{dt:%Y%m%d}ag.htm'.format(
            type=meeting_type, dt=start_date)
        requests.head(mins).raise_for_status()
        return mins

    return 'N/A'
pnda-cli.py 文件源码 项目:pnda-aws-templates 作者: pndaproject 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_pnda_mirror():

    def raise_error(reason):
        CONSOLE.info('PNDA mirror...... ERROR')
        CONSOLE.error(reason)
        CONSOLE.error(traceback.format_exc())
        sys.exit(1)

    try:
        mirror = PNDA_ENV['mirrors']['PNDA_MIRROR']
        response = requests.head(mirror)
        # expect 200 (open mirror) 403 (no listing allowed)
        # or any redirect (in case of proxy/redirect)
        if response.status_code not in [200, 403, 301, 302, 303, 307, 308]:
            raise_error("PNDA mirror configured and present "
                        "but responded with unexpected status code (%s). " % response.status_code)
        CONSOLE.info('PNDA mirror...... OK')
    except KeyError:
        raise_error('PNDA mirror was not defined in pnda_env.yaml')
    except:
        raise_error("Failed to connect to PNDA mirror. Verify connection "
                    "to %s, check mirror in pnda_env.yaml and try again." % mirror)
dl_multithreading.py 文件源码 项目:163mooc_dumper 作者: wsdzl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _len(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML\
        , like Gecko) Chrome/50.0.2661.102 Safari/537.36',
        'Range': 'bytes=0-0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
    }
    with requests.get(url, headers=headers) as r:
        length = r.headers['Content-Range']
        if length.find('0-0/') == -1:
            length = None
        else:
            length = length.split('0-0/')[-1]
            length = int(length) if length else 0
    if not length:
        del headers['Range']
        with requests.head(url, headers=headers) as r:
            length = r.headers['Content-Length']
            length = int(length) if length else 0
    return length
dirsc.py 文件源码 项目:swarm 作者: a7vinx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _scan_target_normal(self,target):
        try:
            r=requests.head(target)
            #it maybe a directory
            if self._check_eponymous_dir(r,target):
                return target+'/;'

            if self._check_exist_code(r.status_code):
                # check content
                r=requests.get(target)
                if self._check_exist_code(r.status_code):
                    for cur in self._not_exist_flag:
                        if r.text.find(cur)!=-1:
                            return ''
                    return target+';'
            return ''
        except Exception as e:
            return ''
veda_deliver_3play.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_media_url(self):
        """
        Validates the media URL

        Raises:
            3PlayMediaUrlError: on invalid media url or content type
        """
        if not self.media_url:
            raise ThreePlayMediaUrlError('Invalid media URL "{media_url}".'.format(media_url=self.media_url))

        response = requests.head(url=self.media_url)
        if not response.ok:
            raise ThreePlayMediaUrlError('The URL "{media_url}" is not Accessible.'.format(media_url=self.media_url))
        elif response.headers['Content-Type'] != self.allowed_content_type:
            raise ThreePlayMediaUrlError(
                'Media content-type should be "{allowed_type}". URL was "{media_url}", content-type was "{type}"'.format(  # pylint: disable=line-too-long
                    allowed_type=self.allowed_content_type,
                    media_url=self.media_url,
                    type=response.headers['Content-Type'],
                )
            )
main.py 文件源码 项目:website-status-cli 作者: prabhakar267 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _check_website(url, code):
    try:
        response = requests.head(url, headers={'user_agent': DEFAULT_HEADERS})
        website_status_code = response.status_code
        if code:
            return website_status_code
        else:
            if website_status_code == 200:
                return True
            else:
                return False
    except requests.exceptions.Timeout, requests.exceptions.ConnectionError:
        if code:
            return -1
        else:
            return False
models.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def sync(self, attempt_perceptual_hash=False):
        """Attempt to sync the image with the remote location. Optionally does a number of
        other actions:
        * If the image exists and we have not constructed a fingerprint for it,
          do so at this time and populate the image model's perceptual_hash field
          (if attempt_perceptual_hash is True; defaults to False because it's slow)
        * If the image does not exist, mark it as removed_from_source
        * If the image is removed_from_source, delete it from the search engine
        * In all cases, update the last_synced_with_source timestamp on the image"""
        req = requests.head(self.url)
        if req.status_code == 200:
            if not self.perceptual_hash and attempt_perceptual_hash:
                self.perceptual_hash = self.generate_hash()
        else:
            self.removed_from_source = True

        self.last_synced_with_source = timezone.now()

        self.save()
restful.py 文件源码 项目:crossrefapi 作者: fabiobatalha 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def do_http_request(self, method, endpoint, data=None, files=None, timeout=100, only_headers=False, custom_header=None):

        if only_headers is True:
            return requests.head(endpoint)

        if method == 'post':
            action = requests.post
        else:
            action = requests.get

        if custom_header:
            headers = {'user-agent': custom_header}
        else:
            headers = {'user-agent': str(Etiquette())}

        if method == 'post':
            result = action(endpoint, data=data, files=files, timeout=timeout, headers=headers)
        else:
            result = action(endpoint, params=data, timeout=timeout, headers=headers)

        if self.throttle is True:
            self._update_rate_limits(result.headers)
            sleep(self.throttling_time)

        return result
common.py 文件源码 项目:utils 作者: jianbing 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_url_file_size(url, proxies=None):
    """???????????????KB

    :param proxies:
    :param url:
    :return:
    """
    r = requests.head(url=url, proxies=proxies, timeout=3.0)

    while r.is_redirect:  # ??????
        # print 'got'
        # print r.headers['Location']
        r = requests.head(url=r.headers['Location'], proxies=proxies, timeout=3.0)
    # print r.headers
    # print r.headers['Content-Length']
    return int(r.headers['Content-Length']) / 1024
webloader.py 文件源码 项目:webkivy 作者: miohtama 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self, url):
        """Load script from URL."""

        # Check for a redirect to get a final base_url where to start

        resp = requests.head(url, allow_redirects=True)
        if url != resp.url:
            # Followed a redirect
            url = resp.url

        fname = get_response_fname(resp)
        _, ext = os.path.splitext(fname)

        if ext == ".py":
            py_file = self.fetch_file(url, url)
            return py_file
        else:
            self.crawl(url, url)
punk_fuzz.py 文件源码 项目:-PunkScan 作者: swordli 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fuzzworth_contentl(self, head, strict = False):
        '''Check the content-length before moving on. If strict is set to True content-length
        must be validated before moving on. If strict is set to False (default) the URL will
        be fuzzed if no content-length is found or no head is returned. The idea behind this
        method is to ensure that huge files are not downloaded, slowing down fuzzing.'''

        #no param no fuzzing
        if head and "content-length" in head:

            content_length = int(head["content-length"])

            if content_length < self.pagesize_limit:
                return True

            else:
                return False

        else:
            if strict:
                return False

            else:
                return True
punk_fuzz.py 文件源码 项目:-PunkScan 作者: swordli 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
        '''This method will check the content length header strictly. If a content-length header is found
        and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
        is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return 
        True. If not or if content-type can't be read either, return False.'''

        #if content-length header is found return True
        if head and "content-length" in head:
            return self.fuzzworth_contentl(head, strict = True)

        #if content-length header not found, check content-type, if it is of allowed type
        #return True, otherwise false
        if head and "content-type" in head:
            return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)

        return False
timeInject.py 文件源码 项目:AutoSQLInjecter 作者: WangYihang 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def exce(indexOfResult,indexOfChar,queryASCII):
    # content-start
    url = "http://127.0.0.1/Less-9/?id="
    tempurl = url + getPayload(indexOfResult,indexOfChar,queryASCII)
    before_time = time.time()
    requests.head(tempurl)
    after_time = time.time()
    # content-end
    use_time = after_time - before_time
    # judge-start
    # ?sleep????? , ???????? (????????????? , ???SQL??????????sleep????????)
    # ?????????? , ????/???????sleep?????????
    if abs(use_time) > error_time: 
        return True
    else:
        return False
    # judge-end
utils.py 文件源码 项目:deb-python-proliantutils 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def validate_href(image_href):
        """Validate HTTP image reference.

        :param image_href: Image reference.
        :raises: exception.ImageRefValidationFailed if HEAD request failed or
            returned response code not equal to 200.
        :returns: Response to HEAD request.
        """
        try:
            response = requests.head(image_href)
            if response.status_code != http_client.OK:
                raise exception.ImageRefValidationFailed(
                    image_href=image_href,
                    reason=("Got HTTP code %s instead of 200 in response to "
                            "HEAD request." % response.status_code))
        except requests.RequestException as e:
            raise exception.ImageRefValidationFailed(image_href=image_href,
                                                     reason=e)
        return response
updates.py 文件源码 项目:siphon-cli 作者: getsiphon 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_remote_source_length():
    url = os.environ.get('SP_REMOTE_SOURCE')
    try:
        response = requests.head(url, allow_redirects=True, timeout=10)
    except requests.exceptions.RequestException as e:
        puts(colored.red('[HEAD] %s' % url))
        puts(colored.red('Failed to get remote installation size: %s' % e))
        sys.exit(1)
    size = response.headers.get('content-length')
    if not size:
        size = response.headers.get('Content-Length')
    if not size or not size.isdigit():
        puts(colored.red('Could not fetch the remote Content-Length.'))
        sys.exit(1)
    try:
        size = int(size)
    except ValueError:
        pass
    return size
Registry.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_imageId(self, ImageName, tag="latest"):
        """ ??????tag?imageId/digest """

        ReqUrl = self._baseUrl + "/repositories/{}/tags/{}".format(ImageName, tag) if self.version == 1 else self._baseUrl + "/{}/manifests/{}".format(ImageName, tag)
        logger.info("_get_imageId for url {}".format(ReqUrl))
        try:
            if self.version == 1:
                r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
            else:
                r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
        except Exception,e:
            logger.error(e, exc_info=True)
            return False
        else:
            if self.version == 1:
                return r.json()
            else:
                return r.headers.get("Docker-Content-Digest", "")
        return ""
m_runtastic.py 文件源码 项目:centr 作者: ibiBgOR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_download_session(url, download_dir, cookies):
    r = requests.head(url, cookies=cookies)
    if r.status_code != 200 or 'Content-Disposition' not in r.headers:
        return False
    m = re.search('filename="(.+)"', r.headers['Content-Disposition'])
    if not m:
        return False
    f = m.group(1)
    filename = os.path.join(download_dir, f)
    if os.path.isfile(filename):
        return True
    # get it
    print "Fetching %s" % f
    r2 = requests.get(url, cookies=cookies)
    if r2.status_code != 200:
        return False
    with open(filename, 'wb') as f:
        f.write(r2.content)
    return True


问题


面经


文章

微信
公众号

扫码关注公众号