def qiniu_upload_img(img_url):
"""?????????
Args:
img_url (string): ????
"""
response = requests.get(img_url)
image = response.content
md5 = calc_md5(image)
qiniu_url = 'http://{}/{}'.format(QINIU_HOSTNAME, md5)
if requests.head(qiniu_url).ok:
return qiniu_url
q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
token = q.upload_token(QINIU_BUCKET, md5, 10)
put_data(token, md5, image, mime_type='image/jpeg')
return qiniu_url
python类head()的实例源码
cadastra_recurso_servidores.py 文件源码
项目:scripts-dadosgovbr
作者: dadosgovbr
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def register(self):
u"Register resource into CKAN"
import ckanapi
ckansite = ckanapi.RemoteCKAN(self.ckan_url, apikey=self.apikey)
# resource url responds?
resource = requests.head(self.url)
self.size = int(resource.headers["content-length"])
# resource exists?
resources = ckansite.action.resource_search(query=u"url:%s" % self.url)
if resources[u"count"] == 0:
ckansite.action.resource_create(
package_id = self.package_id,
url = self.url,
name = self.name,
description = self.description,
format = self.format,
mimetype = self.mimetype,
size = self.size,
)
def _is_update_needed(self):
"""
Determines if an update for the database is necessary
:return: Whether or not the database should be updated
"""
# Retrieve the headers of the response to compare the MD5 checksums of the file
headers_response = requests.head(self.MAXMIND_FREE_DB_URL)
logger.debug(headers_response.headers)
headers_response.raise_for_status()
response_checksum = headers_response.headers.get('X-Database-MD5')
logger.info("Database MD5 received in response headers: " + str(response_checksum))
# Compare the current file checksum to the one received from MaxMind
if response_checksum is not None and response_checksum == self._database_checksum:
return False
self._response_checksum = response_checksum
return True
def head_account_metadata(self):
# get account's metadata
url = self.url
# request api
response = requests.head(url, headers=self.base_headers)
# formatting result
result = dict()
for key in response.headers:
if key == 'X-Account-Container-Count':
result['container_count'] = \
response.headers['X-Account-Container-Count']
elif key == 'X-Account-Object-Count':
result['object_count'] = \
response.headers['X-Account-Object-Count']
elif key == 'X-Account-Bytes-Used':
result['used_bytes'] = replace_bytes_to_readable(
response.headers['X-Account-Bytes-Used']
)
else:
result[key] = response.headers[key]
return result
def head_container_metadata(self, container_name):
"""
get container's metadata
:param container_name: target container name
:return: container's metadata dict
"""
# check container metadata
url = self.url + '/' + container_name
# request api
response = requests.head(url, headers=self.base_headers)
# formatting result
result = dict()
for key in response.headers:
if key == 'X-Container-Object-Count':
result['object_count'] = \
response.headers['X-Container-Object-Count']
elif key == 'X-Container-Bytes-Used':
result['used_bytes'] = replace_bytes_to_readable(
response.headers['X-Container-Bytes-Used']
)
else:
result[key] = response.headers[key]
return result
def get_layer_size(self, layer_hash):
"""
Attempt to return the size of the given layer
"""
url = "{base_url}/v2/{name}/blobs/{layer_hash}".format(
base_url=self.base_url,
name=self.repository_name,
layer_hash=layer_hash)
headers = {}
if self.token is not None:
headers["Authorization"] = "Bearer %s" % self.token
r = requests.head(url, headers=headers, allow_redirects=True, timeout=(3.05,5))
r.raise_for_status()
if "content-length" in r.headers:
self.layer_sizes[layer_hash] = int(r.headers["content-length"])
else:
self.layer_sizes[layer_hash] = None
return self.layer_sizes[layer_hash]
def getDLsize(self):
debug_log("getDLsize called")
it = QTreeWidgetItemIterator(self.tw)
while it.value():
item = it.value()
url_test = item.data(0, dataURL)
if url_test is not None:
try:
r = requests.head(url_test)
r.raise_for_status()
try:
size = (int(r.headers['Content-Length']) / 1024) / 1024
except ValueError:
size = 0
if size > 0:
item.setText(2, "{} MiB".format(round(size, 2)))
except requests.exceptions.HTTPError:
debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers))
item.setText(2, r.status_code)
except requests.exceptions.RequestException as e:
item.setText(2, self.tr("Error"))
debug_log(e, logging.ERROR)
it += 1
def head(self):
try:
response = requests.head(self.url, timeout=self.request_timeout)
self.res = response # assign the response object from requests to a property on the instance of HTTP class
return response.headers
except requests.exceptions.ConnectionError as ce:
return False
except Exception as e:
if DEBUG_FLAG:
sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
raise e
#------------------------------------------------------------------------------
# [ post method ] (string)
# HTTP POST request for text
# returns text from the URL as a string
#------------------------------------------------------------------------------
def head(self):
try:
response = requests.head(self.url, timeout=self.request_timeout)
self.res = response # assign the response object from requests to a property on the instance of HTTP class
return response.headers
except requests.exceptions.ConnectionError as ce:
return False
except Exception as e:
if DEBUG_FLAG:
sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
raise e
#------------------------------------------------------------------------------
# [ post method ] (string)
# HTTP POST request for text
# returns text from the URL as a string
#------------------------------------------------------------------------------
def _get_wp_api_url(self, url):
"""
Private function for finding the WP-API URL.
Arguments
---------
url : str
WordPress instance URL.
"""
resp = requests.head(url)
# Search the Links for rel="https://api.w.org/".
wp_api_rel = resp.links.get('https://api.w.org/')
if wp_api_rel:
return wp_api_rel['url']
else:
# TODO: Rasie a better exception to the rel doesn't exist.
raise Exception
def already_cached(self, asset_url):
'''Checks if an item is already cached. This is indicated in the
headers of the file being checked.'''
try:
req = requests.head(asset_url, headers={'user-agent': self.user_agent}) # NOQA
if req.headers.get('Content-Type') is not None:
# Item is not in cache
self.log.debug('Not in cache: %s' % asset_url)
return False
else:
# Item is already cached
self.log.info('Already in cache: %s' % asset_url)
return True
except:
# In case there is an error, we should return false anyway as there
# is no harm in re-downloading the item if it's already downloaded.
return False
def get_normal_title(self,url):
return_message = ""
head = requests.head(url)
max_size = 5e6
if 'content-length' in head.headers and int(head.headers['content-length']) > max_size:
return_message = "File too big for link preview\r\n"
else:
with eventlet.Timeout(60, False):
response = requests.get(url,timeout=30)
if response.status_code == 200:
if 'text/html' in response.headers['content-type']:
soup = BeautifulSoup(response.content,"lxml")
if soup.title is not None:
return_message += soup.title.string + "\r\n"
else:
return_message += response.headers['content-type'] + " Size: " + response.headers['content-length'] + "\r\n"
return return_message
def find_decl_doc(self, name):
raise IncludeError(name)
import requests
from requests.exceptions import InvalidSchema
url = METATAB_ASSETS_URL + name + '.csv'
try:
# See if it exists online in the official repo
r = requests.head(url, allow_redirects=False)
if r.status_code == requests.codes.ok:
return url
except InvalidSchema:
pass # It's probably FTP
def get_minutes_url(config, metadata):
start_date = parse_timestamp_naively(metadata['start']).astimezone(get_tz(config))
meeting_type = None
for key, value in config.get('minutes_abbrs', {}).items():
if key in metadata[config['minutes_abbrs_for']]:
meeting_type = value
break
minutes_url = metadata.get('minutes_url')
if minutes_url:
requests.head(minutes_url).raise_for_status()
return minutes_url
elif config['id'] == 'vancouver':
if not meeting_type:
return 'N/A'
if metadata['title'] == 'Inaugural Council Meeting':
meeting_type = 'inau'
mins = 'http://council.vancouver.ca/{dt:%Y%m%d}/{type}{dt:%Y%m%d}ag.htm'.format(
type=meeting_type, dt=start_date)
requests.head(mins).raise_for_status()
return mins
return 'N/A'
def check_pnda_mirror():
def raise_error(reason):
CONSOLE.info('PNDA mirror...... ERROR')
CONSOLE.error(reason)
CONSOLE.error(traceback.format_exc())
sys.exit(1)
try:
mirror = PNDA_ENV['mirrors']['PNDA_MIRROR']
response = requests.head(mirror)
# expect 200 (open mirror) 403 (no listing allowed)
# or any redirect (in case of proxy/redirect)
if response.status_code not in [200, 403, 301, 302, 303, 307, 308]:
raise_error("PNDA mirror configured and present "
"but responded with unexpected status code (%s). " % response.status_code)
CONSOLE.info('PNDA mirror...... OK')
except KeyError:
raise_error('PNDA mirror was not defined in pnda_env.yaml')
except:
raise_error("Failed to connect to PNDA mirror. Verify connection "
"to %s, check mirror in pnda_env.yaml and try again." % mirror)
def _len(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML\
, like Gecko) Chrome/50.0.2661.102 Safari/537.36',
'Range': 'bytes=0-0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
}
with requests.get(url, headers=headers) as r:
length = r.headers['Content-Range']
if length.find('0-0/') == -1:
length = None
else:
length = length.split('0-0/')[-1]
length = int(length) if length else 0
if not length:
del headers['Range']
with requests.head(url, headers=headers) as r:
length = r.headers['Content-Length']
length = int(length) if length else 0
return length
def _scan_target_normal(self,target):
try:
r=requests.head(target)
#it maybe a directory
if self._check_eponymous_dir(r,target):
return target+'/;'
if self._check_exist_code(r.status_code):
# check content
r=requests.get(target)
if self._check_exist_code(r.status_code):
for cur in self._not_exist_flag:
if r.text.find(cur)!=-1:
return ''
return target+';'
return ''
except Exception as e:
return ''
def validate_media_url(self):
"""
Validates the media URL
Raises:
3PlayMediaUrlError: on invalid media url or content type
"""
if not self.media_url:
raise ThreePlayMediaUrlError('Invalid media URL "{media_url}".'.format(media_url=self.media_url))
response = requests.head(url=self.media_url)
if not response.ok:
raise ThreePlayMediaUrlError('The URL "{media_url}" is not Accessible.'.format(media_url=self.media_url))
elif response.headers['Content-Type'] != self.allowed_content_type:
raise ThreePlayMediaUrlError(
'Media content-type should be "{allowed_type}". URL was "{media_url}", content-type was "{type}"'.format( # pylint: disable=line-too-long
allowed_type=self.allowed_content_type,
media_url=self.media_url,
type=response.headers['Content-Type'],
)
)
def _check_website(url, code):
try:
response = requests.head(url, headers={'user_agent': DEFAULT_HEADERS})
website_status_code = response.status_code
if code:
return website_status_code
else:
if website_status_code == 200:
return True
else:
return False
except requests.exceptions.Timeout, requests.exceptions.ConnectionError:
if code:
return -1
else:
return False
def sync(self, attempt_perceptual_hash=False):
"""Attempt to sync the image with the remote location. Optionally does a number of
other actions:
* If the image exists and we have not constructed a fingerprint for it,
do so at this time and populate the image model's perceptual_hash field
(if attempt_perceptual_hash is True; defaults to False because it's slow)
* If the image does not exist, mark it as removed_from_source
* If the image is removed_from_source, delete it from the search engine
* In all cases, update the last_synced_with_source timestamp on the image"""
req = requests.head(self.url)
if req.status_code == 200:
if not self.perceptual_hash and attempt_perceptual_hash:
self.perceptual_hash = self.generate_hash()
else:
self.removed_from_source = True
self.last_synced_with_source = timezone.now()
self.save()
def do_http_request(self, method, endpoint, data=None, files=None, timeout=100, only_headers=False, custom_header=None):
if only_headers is True:
return requests.head(endpoint)
if method == 'post':
action = requests.post
else:
action = requests.get
if custom_header:
headers = {'user-agent': custom_header}
else:
headers = {'user-agent': str(Etiquette())}
if method == 'post':
result = action(endpoint, data=data, files=files, timeout=timeout, headers=headers)
else:
result = action(endpoint, params=data, timeout=timeout, headers=headers)
if self.throttle is True:
self._update_rate_limits(result.headers)
sleep(self.throttling_time)
return result
def get_url_file_size(url, proxies=None):
"""???????????????KB
:param proxies:
:param url:
:return:
"""
r = requests.head(url=url, proxies=proxies, timeout=3.0)
while r.is_redirect: # ??????
# print 'got'
# print r.headers['Location']
r = requests.head(url=r.headers['Location'], proxies=proxies, timeout=3.0)
# print r.headers
# print r.headers['Content-Length']
return int(r.headers['Content-Length']) / 1024
def load(self, url):
"""Load script from URL."""
# Check for a redirect to get a final base_url where to start
resp = requests.head(url, allow_redirects=True)
if url != resp.url:
# Followed a redirect
url = resp.url
fname = get_response_fname(resp)
_, ext = os.path.splitext(fname)
if ext == ".py":
py_file = self.fetch_file(url, url)
return py_file
else:
self.crawl(url, url)
def fuzzworth_contentl(self, head, strict = False):
'''Check the content-length before moving on. If strict is set to True content-length
must be validated before moving on. If strict is set to False (default) the URL will
be fuzzed if no content-length is found or no head is returned. The idea behind this
method is to ensure that huge files are not downloaded, slowing down fuzzing.'''
#no param no fuzzing
if head and "content-length" in head:
content_length = int(head["content-length"])
if content_length < self.pagesize_limit:
return True
else:
return False
else:
if strict:
return False
else:
return True
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
'''This method will check the content length header strictly. If a content-length header is found
and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return
True. If not or if content-type can't be read either, return False.'''
#if content-length header is found return True
if head and "content-length" in head:
return self.fuzzworth_contentl(head, strict = True)
#if content-length header not found, check content-type, if it is of allowed type
#return True, otherwise false
if head and "content-type" in head:
return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)
return False
def exce(indexOfResult,indexOfChar,queryASCII):
# content-start
url = "http://127.0.0.1/Less-9/?id="
tempurl = url + getPayload(indexOfResult,indexOfChar,queryASCII)
before_time = time.time()
requests.head(tempurl)
after_time = time.time()
# content-end
use_time = after_time - before_time
# judge-start
# ?sleep????? , ???????? (????????????? , ???SQL??????????sleep????????)
# ?????????? , ????/???????sleep?????????
if abs(use_time) > error_time:
return True
else:
return False
# judge-end
def validate_href(image_href):
"""Validate HTTP image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if HEAD request failed or
returned response code not equal to 200.
:returns: Response to HEAD request.
"""
try:
response = requests.head(image_href)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=("Got HTTP code %s instead of 200 in response to "
"HEAD request." % response.status_code))
except requests.RequestException as e:
raise exception.ImageRefValidationFailed(image_href=image_href,
reason=e)
return response
def get_remote_source_length():
url = os.environ.get('SP_REMOTE_SOURCE')
try:
response = requests.head(url, allow_redirects=True, timeout=10)
except requests.exceptions.RequestException as e:
puts(colored.red('[HEAD] %s' % url))
puts(colored.red('Failed to get remote installation size: %s' % e))
sys.exit(1)
size = response.headers.get('content-length')
if not size:
size = response.headers.get('Content-Length')
if not size or not size.isdigit():
puts(colored.red('Could not fetch the remote Content-Length.'))
sys.exit(1)
try:
size = int(size)
except ValueError:
pass
return size
def _get_imageId(self, ImageName, tag="latest"):
""" ??????tag?imageId/digest """
ReqUrl = self._baseUrl + "/repositories/{}/tags/{}".format(ImageName, tag) if self.version == 1 else self._baseUrl + "/{}/manifests/{}".format(ImageName, tag)
logger.info("_get_imageId for url {}".format(ReqUrl))
try:
if self.version == 1:
r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
else:
r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
except Exception,e:
logger.error(e, exc_info=True)
return False
else:
if self.version == 1:
return r.json()
else:
return r.headers.get("Docker-Content-Digest", "")
return ""
def check_download_session(url, download_dir, cookies):
r = requests.head(url, cookies=cookies)
if r.status_code != 200 or 'Content-Disposition' not in r.headers:
return False
m = re.search('filename="(.+)"', r.headers['Content-Disposition'])
if not m:
return False
f = m.group(1)
filename = os.path.join(download_dir, f)
if os.path.isfile(filename):
return True
# get it
print "Fetching %s" % f
r2 = requests.get(url, cookies=cookies)
if r2.status_code != 200:
return False
with open(filename, 'wb') as f:
f.write(r2.content)
return True