python类head()的实例源码

fast.py 文件源码 项目:moescan 作者: RicterZ 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _scan(self):
        while True:
            if self.queue.empty():
                break
            try:
                sub = self.queue.get_nowait()
                self.queue.task_done()
                domain = self.target + sub
                r = requests.head(domain, headers=header, timeout=5, stream=True)
                code = r.status_code
                if code in self.status_code:
                    logger.info('status code {} -> {}'.format(code, domain))

            except Exception, e:
                pass

        self.thread_count -= 1
Registry.py 文件源码 项目:Breezes 作者: staugur 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _from_image_tag_getId(self, ImageName, tag, url, version=1):
        """ ??????tag?imageId/digest """

        if url:
            ReqUrl = url.strip("/") + "/v1/repositories/{}/tags/{}".format(ImageName, tag) if version == 1 else url.strip("/") + "/v2/{}/manifests/{}".format(ImageName, tag)
            logger.info("_from_image_tag_getId for url {}".format(ReqUrl))
            try:
                if version == 1:
                    r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
                else:
                    r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
            except Exception,e:
                logger.error(e, exc_info=True)
            else:
                if version == 1:
                    return r.json()
                else:
                    return r.headers.get("Docker-Content-Digest", "")
        return ""
NCARDataset.py 文件源码 项目:PyGEOMET 作者: pygeomet 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setURL(self,update=None):
        #if no valid path, do nothing:
        if self.path == None or self.grid == None or self.var == None or\
        self.ext == None or self.year == None:
            self.url = None
        else:
            self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
            self.ext + self.year + ".nc"

        #Check if the file exists
        fexist = requests.head(self.url+'.html')
        if (fexist.status_code >= 400):
            self.yearList = self.yearList[1:]
            self.year = self.yearList[self.currentYearIndex+1]
            self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
            self.ext + self.year + ".nc"
        if update == True:
            self.NCARfile()
file_getter.py 文件源码 项目:Pentest 作者: n3k 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            piece = self.queue.get()
            words = piece.split("\n")
            for word in words:
                url = self.target + word + self.suffix
                #print "[*] Trying: " + url
                try:
                    r = requests.head(url)
                    if r.status_code == 200:
                        print "[+] 200 - " + url
                except:
                    print "[*] Request Error: " + url
            self.queue.task_done()

# http://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python
httprequestlib.py 文件源码 项目:cyouapitestframework 作者: ChrisWang1985 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None):
        if method not in ["get", "post", "put", "delete", "head", "options"]:
            raise Exception("Not supported method: %s" % method)

        _cookie_obj = cookie
        _response = None

        if header is not "":
            _request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url,
                                                                                                    request_body,
                                                                                                    header)
        else:
            _request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body)

        exec _request_api_string

        return _response
rtma_source.py 文件源码 项目:wrfxpy 作者: openwfm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _is_var_ready(self, cycle, var):
        """
        Checks if the variable var is ready for the given forecast hour by comparing its
        filetime to the timestamp given by the forecast hour.  If the filetime is newer
        (later) then the variable is ready.

        :param cycle: which cycle are we working with (UTC)
        :param var: the variable identifier
        :return: true if the variable is ready
        """
        # find last-modified time of file in UTC timezone
        url = self._remote_var_url(cycle.hour, var)
        r = requests.head(url)
        if r.status_code != 200:
            raise ValueError('Cannot find variable %s for hour %d at url %s' % (var, cycle.hour, url))
        last_modif = self._parse_header_timestamp(r.headers['Last-Modified'])

        return last_modif > cycle
beatle.py 文件源码 项目:cassandra-backup-and-restore 作者: WenyuChang 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_args():
    '''
    setup command line arguments
    '''
    parser = argparse.ArgumentParser(description="Beatle request utility",
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--debug', action='store_true', default=False,
                        help="enable debugging log")
    parser.add_argument('--shard', action='store',
                        help='shard name or list shards by "list" value')
    parser.add_argument('request', action='store',
                        help='request: post/get/head/delete/list')
    parser.add_argument('filename', action='store',
                        help='file name')

    return parser
domains.py 文件源码 项目:script.reddit.reader 作者: gedisony 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sitesManager( media_url ):
    #picks which class will handle the media identification and extraction for website_name

    #first resolve url shortener
    shorteners=['bit.ly','goo.gl','tinyurl.com']
    if any(shortener in media_url for shortener in shorteners):
        #v=sitesBase.requests_get('https://unshorten.me/s/'+ urllib.quote_plus( media_url ) )
        v = requests.head( media_url, timeout=REQUEST_TIMEOUT, allow_redirects=True )
        log('  short url(%s)=%s'%(media_url,repr(v.url)))
        media_url=v.url

    for subcls in sitesBase.__subclasses__():
        regex=subcls.regex
        if regex:
            match=re.compile( regex  , re.I).findall( media_url )
            #log("testing:{0}[{1}] {2}".format(media_url,regex, repr(match)) )
            if match :
                return subcls( media_url )
SourceLeakHacker.py 文件源码 项目:SourceLeakHacker 作者: WangYihang 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check(url, timeout):
    try:
        if timeout <= 0:
            timeout = 20
        response = requests.head(url,timeout = timeout)
        code = response.status_code
        screenLock.acquire()
        if code == 200:
            ColorPrinter.print_green_text("[ " + str(code) + " ]")
            print "Checking : " + url
            if "404" in response.text:
                ColorPrinter.print_blue_text(url + "\tMaybe every page same!")
        elif code == 404 or code == 405:
            pass
            # ColorPrinter.print_red_text("[ " + str(code) + " ]")
            # print "Checking : " + url
        else:
            ColorPrinter.print_blue_text("[ " + str(code) + " ]")
            print "Checking : " + url
    except Exception as e:
        screenLock.acquire()
        print e
    finally:
        screenLock.release()
hac_gateway_info_disclosure.py 文件源码 项目:AngelSword 作者: Lucifer1993 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        headers = {
        "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
        }
        payloads = ["/excel/sso_user_export.php",
                    "/excel/user_export.php",
                    "/excel/server_export.php"]

        try:
            for payload in payloads:
                vulnurl = self.url + payload
                req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
                if r"application/vnd.ms-excel" in req.headers["Content-Type"]:
                    cprint("[+]???????????????...(??)\tpayload: "+vulnurl, "yellow")

        except:
            cprint("[-] "+__file__+"====>????", "cyan")
dkcms_database_disclosure.py 文件源码 项目:AngelSword 作者: Lucifer1993 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self):
        headers = {
            "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
        }
        payloads = ["/data/dkcm_ssdfhwejkfs.mdb",
                    "/_data/___dkcms_30_free.mdb",
                    "/_data/I^(()UU()H.mdb"]
        for payload in payloads:
            vulnurl = self.url + payload
            try:
                req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
                if req.headers["Content-Type"] == "application/x-msaccess":
                    cprint("[+]??dkcms???????...(??)\tpayload: "+vulnurl, "red")

            except:
                cprint("[-] "+__file__+"====>????", "cyan")
punk_fuzz.py 文件源码 项目:punkspider 作者: aiwennba 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def fuzzworth_contentl(self, head, strict = False):
        '''Check the content-length before moving on. If strict is set to True content-length
        must be validated before moving on. If strict is set to False (default) the URL will
        be fuzzed if no content-length is found or no head is returned. The idea behind this
        method is to ensure that huge files are not downloaded, slowing down fuzzing.'''

        #no param no fuzzing
        if head and "content-length" in head:

            content_length = int(head["content-length"])

            if content_length < self.pagesize_limit:
                return True

            else:
                return False

        else:
            if strict:
                return False

            else:
                return True
punk_fuzz.py 文件源码 项目:punkspider 作者: aiwennba 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
        '''This method will check the content length header strictly. If a content-length header is found
        and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
        is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return 
        True. If not or if content-type can't be read either, return False.'''

        #if content-length header is found return True
        if head and "content-length" in head:
            return self.fuzzworth_contentl(head, strict = True)

        #if content-length header not found, check content-type, if it is of allowed type
        #return True, otherwise false
        if head and "content-type" in head:
            return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)

        return False
repo_scan.py 文件源码 项目:spdx-github 作者: spdx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def pull_request_to_github(main_repo_user, repo_name, environment):

    #Creating strings, mainly URLs, that will be needed for the
    #pull request process.

    #This is the API URL to make a pull request
    pull_request_url = ('https://api.github.com/repos/{}/{}/pulls'.format(
                        main_repo_user, repo_name))
    #This has the username and password from the environment file.
    #It is used to log in for API calls.
    auth_string = ('{}:{}'.format(environment['github_username'],
                   environment['github_password']))
    #This is the data that will be posted for the pull request.
    #It tells the API what the pull request will be like.
    pull_request_data = ('{{"title": "{}", "head": "{}:master",'
                         ' "base": "master"}}'.format(
                         environment['github_pull_request_title'],
                         environment['github_username']))

    pull_request_command = ['curl', '--user', auth_string, pull_request_url,
                            '-d', pull_request_data]

    #Make the pull request to the main repository
    subprocess.check_output(pull_request_command)
    return pull_request_command
repo_scan.py 文件源码 项目:spdx-github 作者: spdx 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_valid_url(repo_zip_url):
    #Check that it's a URL
    if(repo_zip_url[:7] != 'http://' and repo_zip_url[:8] != 'https://'):
        return False
    #Get just the head.
    request = requests.head(repo_zip_url)
    #It should be either success (200's) or redirect(300's).
    #Otherwise, inform the user of failure.
    if (request.status_code >= 400 or request.status_code < 200):
        print('Could not reach URL provided.\n')
        print('Provided url was {} and resulted in status code {}'.format(
              repo_zip_url,str(request.status_code)))
        return False
    else:
        return True

#This sends a notification email based on information provided
#in the environment file
harborclient.py 文件源码 项目:harbor-py 作者: tobegit3hub 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_project_exist(self, project_name):
        result = False
        path = '%s://%s/api/projects?project_name=%s' % (
            self.protocol, self.host, project_name)
        response = requests.head(path,
                                 cookies={'beegosessionID': self.session_id})
        if response.status_code == 200:
            result = True
            logging.debug(
                "Successfully check project exist, result: {}".format(result))
        elif response.status_code == 404:
            result = False
            logging.debug(
                "Successfully check project exist, result: {}".format(result))
        else:
            logging.error("Fail to check project exist")
        return result

    # POST /projects
check_modulemd.py 文件源码 项目:check_modulemd 作者: fedora-modularity 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _is_commit_ref_available(self, package, namespace):
        """
        Check if commit ref is available on git repository
        """
        if not package or not namespace:
            self.error("Missing parameter to check if commit is available")

        repository = "https://src.fedoraproject.org/cgit/%s/%s.git" % (namespace, package.name)

        if package.repository:
            repository = package.repository

        ref = "HEAD"
        if package.ref:
            ref = package.ref
        patch_path = "/patch/?id=%s" % ref

        url = repository + patch_path
        resp = requests.head(url)
        if resp.status_code < 200 or resp.status_code >= 300:
            self.error("Could not find ref '%s' on '%s'. returned exit status %d; output:\n%s" %
                       (ref, package.name, resp.status_code, resp.text))

        self.log.info("Found ref: %s for %s" % (ref, package.name))
check_links.py 文件源码 项目:devblogs 作者: abdelhai 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def url_resp(url):
    """Receives a url returns True in case of 200 or 301 response codes"""
    res = None
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
        res = requests.head(url, timeout=3, headers=headers)
    except:
        pass

    if res:
        code = res.status_code
        print('*' + str(code) + '*')
        if code == 200 or code == 301:

            print(url)
            print('#ok#')
            return True
        else:
            print('#bad#')
uptimemonitor.py 文件源码 项目:SlackUptimeMonitor 作者: AndreiD 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def back_online_notifier(the_url):
    print("found " + the_url + " back online.")
    try:
        old_status_file = pickle.load(open("status.p", "rb"))
    except Exception as ex:
        print("The status.p file was not found. it will be recreated." + str(ex))
        return

    if (the_url in old_status_file) and (old_status_file[the_url]['status'] == "down"):
        it_was_down_time = old_status_file[the_url]['time']
        current_time = datetime.datetime.now().replace(microsecond=0)
        send_message(CHANNEL_ID, ":herb: " + the_url + " is back online. It was down for " + str(current_time - it_was_down_time))
    else:
        print("skipping notifying that the url is online")


# --- getting only the head ---
network.py 文件源码 项目:twentybn-dl 作者: TwentyBN 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def total_blocks_and_bytes(self):
        total_blocks, total_bytes = 0, 0
        for u in self.input_urls:
            head_response_headers = requests.head(u).headers
            if 'Content-Length' not in head_response_headers:
                m = "The url: '{}' doesn't support the 'Content-Length' field.".format(u)
                raise ContentLengthNotSupportedException(m)
            else:
                remote_size = int(head_response_headers['Content-Length'])
            total_bytes += remote_size
            num_blocks, last_block_size = divmod(remote_size, self.blocksize)
            total_blocks += num_blocks
            if last_block_size:
                total_blocks += 1
        return total_blocks, total_bytes


问题


面经


文章

微信
公众号

扫码关注公众号