def _scan(self):
while True:
if self.queue.empty():
break
try:
sub = self.queue.get_nowait()
self.queue.task_done()
domain = self.target + sub
r = requests.head(domain, headers=header, timeout=5, stream=True)
code = r.status_code
if code in self.status_code:
logger.info('status code {} -> {}'.format(code, domain))
except Exception, e:
pass
self.thread_count -= 1
python类head()的实例源码
def _from_image_tag_getId(self, ImageName, tag, url, version=1):
""" ??????tag?imageId/digest """
if url:
ReqUrl = url.strip("/") + "/v1/repositories/{}/tags/{}".format(ImageName, tag) if version == 1 else url.strip("/") + "/v2/{}/manifests/{}".format(ImageName, tag)
logger.info("_from_image_tag_getId for url {}".format(ReqUrl))
try:
if version == 1:
r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
else:
r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
except Exception,e:
logger.error(e, exc_info=True)
else:
if version == 1:
return r.json()
else:
return r.headers.get("Docker-Content-Digest", "")
return ""
def setURL(self,update=None):
#if no valid path, do nothing:
if self.path == None or self.grid == None or self.var == None or\
self.ext == None or self.year == None:
self.url = None
else:
self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
self.ext + self.year + ".nc"
#Check if the file exists
fexist = requests.head(self.url+'.html')
if (fexist.status_code >= 400):
self.yearList = self.yearList[1:]
self.year = self.yearList[self.currentYearIndex+1]
self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
self.ext + self.year + ".nc"
if update == True:
self.NCARfile()
def run(self):
while True:
piece = self.queue.get()
words = piece.split("\n")
for word in words:
url = self.target + word + self.suffix
#print "[*] Trying: " + url
try:
r = requests.head(url)
if r.status_code == 200:
print "[+] 200 - " + url
except:
print "[*] Request Error: " + url
self.queue.task_done()
# http://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None):
if method not in ["get", "post", "put", "delete", "head", "options"]:
raise Exception("Not supported method: %s" % method)
_cookie_obj = cookie
_response = None
if header is not "":
_request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url,
request_body,
header)
else:
_request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body)
exec _request_api_string
return _response
def _is_var_ready(self, cycle, var):
"""
Checks if the variable var is ready for the given forecast hour by comparing its
filetime to the timestamp given by the forecast hour. If the filetime is newer
(later) then the variable is ready.
:param cycle: which cycle are we working with (UTC)
:param var: the variable identifier
:return: true if the variable is ready
"""
# find last-modified time of file in UTC timezone
url = self._remote_var_url(cycle.hour, var)
r = requests.head(url)
if r.status_code != 200:
raise ValueError('Cannot find variable %s for hour %d at url %s' % (var, cycle.hour, url))
last_modif = self._parse_header_timestamp(r.headers['Last-Modified'])
return last_modif > cycle
def setup_args():
'''
setup command line arguments
'''
parser = argparse.ArgumentParser(description="Beatle request utility",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--debug', action='store_true', default=False,
help="enable debugging log")
parser.add_argument('--shard', action='store',
help='shard name or list shards by "list" value')
parser.add_argument('request', action='store',
help='request: post/get/head/delete/list')
parser.add_argument('filename', action='store',
help='file name')
return parser
def sitesManager( media_url ):
#picks which class will handle the media identification and extraction for website_name
#first resolve url shortener
shorteners=['bit.ly','goo.gl','tinyurl.com']
if any(shortener in media_url for shortener in shorteners):
#v=sitesBase.requests_get('https://unshorten.me/s/'+ urllib.quote_plus( media_url ) )
v = requests.head( media_url, timeout=REQUEST_TIMEOUT, allow_redirects=True )
log(' short url(%s)=%s'%(media_url,repr(v.url)))
media_url=v.url
for subcls in sitesBase.__subclasses__():
regex=subcls.regex
if regex:
match=re.compile( regex , re.I).findall( media_url )
#log("testing:{0}[{1}] {2}".format(media_url,regex, repr(match)) )
if match :
return subcls( media_url )
def check(url, timeout):
try:
if timeout <= 0:
timeout = 20
response = requests.head(url,timeout = timeout)
code = response.status_code
screenLock.acquire()
if code == 200:
ColorPrinter.print_green_text("[ " + str(code) + " ]")
print "Checking : " + url
if "404" in response.text:
ColorPrinter.print_blue_text(url + "\tMaybe every page same!")
elif code == 404 or code == 405:
pass
# ColorPrinter.print_red_text("[ " + str(code) + " ]")
# print "Checking : " + url
else:
ColorPrinter.print_blue_text("[ " + str(code) + " ]")
print "Checking : " + url
except Exception as e:
screenLock.acquire()
print e
finally:
screenLock.release()
def run(self):
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
}
payloads = ["/excel/sso_user_export.php",
"/excel/user_export.php",
"/excel/server_export.php"]
try:
for payload in payloads:
vulnurl = self.url + payload
req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
if r"application/vnd.ms-excel" in req.headers["Content-Type"]:
cprint("[+]???????????????...(??)\tpayload: "+vulnurl, "yellow")
except:
cprint("[-] "+__file__+"====>????", "cyan")
def run(self):
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
}
payloads = ["/data/dkcm_ssdfhwejkfs.mdb",
"/_data/___dkcms_30_free.mdb",
"/_data/I^(()UU()H.mdb"]
for payload in payloads:
vulnurl = self.url + payload
try:
req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
if req.headers["Content-Type"] == "application/x-msaccess":
cprint("[+]??dkcms???????...(??)\tpayload: "+vulnurl, "red")
except:
cprint("[-] "+__file__+"====>????", "cyan")
def fuzzworth_contentl(self, head, strict = False):
'''Check the content-length before moving on. If strict is set to True content-length
must be validated before moving on. If strict is set to False (default) the URL will
be fuzzed if no content-length is found or no head is returned. The idea behind this
method is to ensure that huge files are not downloaded, slowing down fuzzing.'''
#no param no fuzzing
if head and "content-length" in head:
content_length = int(head["content-length"])
if content_length < self.pagesize_limit:
return True
else:
return False
else:
if strict:
return False
else:
return True
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
'''This method will check the content length header strictly. If a content-length header is found
and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return
True. If not or if content-type can't be read either, return False.'''
#if content-length header is found return True
if head and "content-length" in head:
return self.fuzzworth_contentl(head, strict = True)
#if content-length header not found, check content-type, if it is of allowed type
#return True, otherwise false
if head and "content-type" in head:
return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)
return False
def pull_request_to_github(main_repo_user, repo_name, environment):
#Creating strings, mainly URLs, that will be needed for the
#pull request process.
#This is the API URL to make a pull request
pull_request_url = ('https://api.github.com/repos/{}/{}/pulls'.format(
main_repo_user, repo_name))
#This has the username and password from the environment file.
#It is used to log in for API calls.
auth_string = ('{}:{}'.format(environment['github_username'],
environment['github_password']))
#This is the data that will be posted for the pull request.
#It tells the API what the pull request will be like.
pull_request_data = ('{{"title": "{}", "head": "{}:master",'
' "base": "master"}}'.format(
environment['github_pull_request_title'],
environment['github_username']))
pull_request_command = ['curl', '--user', auth_string, pull_request_url,
'-d', pull_request_data]
#Make the pull request to the main repository
subprocess.check_output(pull_request_command)
return pull_request_command
def check_valid_url(repo_zip_url):
#Check that it's a URL
if(repo_zip_url[:7] != 'http://' and repo_zip_url[:8] != 'https://'):
return False
#Get just the head.
request = requests.head(repo_zip_url)
#It should be either success (200's) or redirect(300's).
#Otherwise, inform the user of failure.
if (request.status_code >= 400 or request.status_code < 200):
print('Could not reach URL provided.\n')
print('Provided url was {} and resulted in status code {}'.format(
repo_zip_url,str(request.status_code)))
return False
else:
return True
#This sends a notification email based on information provided
#in the environment file
def check_project_exist(self, project_name):
result = False
path = '%s://%s/api/projects?project_name=%s' % (
self.protocol, self.host, project_name)
response = requests.head(path,
cookies={'beegosessionID': self.session_id})
if response.status_code == 200:
result = True
logging.debug(
"Successfully check project exist, result: {}".format(result))
elif response.status_code == 404:
result = False
logging.debug(
"Successfully check project exist, result: {}".format(result))
else:
logging.error("Fail to check project exist")
return result
# POST /projects
def _is_commit_ref_available(self, package, namespace):
"""
Check if commit ref is available on git repository
"""
if not package or not namespace:
self.error("Missing parameter to check if commit is available")
repository = "https://src.fedoraproject.org/cgit/%s/%s.git" % (namespace, package.name)
if package.repository:
repository = package.repository
ref = "HEAD"
if package.ref:
ref = package.ref
patch_path = "/patch/?id=%s" % ref
url = repository + patch_path
resp = requests.head(url)
if resp.status_code < 200 or resp.status_code >= 300:
self.error("Could not find ref '%s' on '%s'. returned exit status %d; output:\n%s" %
(ref, package.name, resp.status_code, resp.text))
self.log.info("Found ref: %s for %s" % (ref, package.name))
def url_resp(url):
"""Receives a url returns True in case of 200 or 301 response codes"""
res = None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
res = requests.head(url, timeout=3, headers=headers)
except:
pass
if res:
code = res.status_code
print('*' + str(code) + '*')
if code == 200 or code == 301:
print(url)
print('#ok#')
return True
else:
print('#bad#')
def back_online_notifier(the_url):
print("found " + the_url + " back online.")
try:
old_status_file = pickle.load(open("status.p", "rb"))
except Exception as ex:
print("The status.p file was not found. it will be recreated." + str(ex))
return
if (the_url in old_status_file) and (old_status_file[the_url]['status'] == "down"):
it_was_down_time = old_status_file[the_url]['time']
current_time = datetime.datetime.now().replace(microsecond=0)
send_message(CHANNEL_ID, ":herb: " + the_url + " is back online. It was down for " + str(current_time - it_was_down_time))
else:
print("skipping notifying that the url is online")
# --- getting only the head ---
def total_blocks_and_bytes(self):
total_blocks, total_bytes = 0, 0
for u in self.input_urls:
head_response_headers = requests.head(u).headers
if 'Content-Length' not in head_response_headers:
m = "The url: '{}' doesn't support the 'Content-Length' field.".format(u)
raise ContentLengthNotSupportedException(m)
else:
remote_size = int(head_response_headers['Content-Length'])
total_bytes += remote_size
num_blocks, last_block_size = divmod(remote_size, self.blocksize)
total_blocks += num_blocks
if last_block_size:
total_blocks += 1
return total_blocks, total_bytes