def dorequest(url, data = "", method = 'GET'):
try:
if method == 'GET':
response = urllib.request.urlopen(url, timeout=10).read()
else:
# use PUT/DELETE/POST, data should be encoded in ascii/bytes
request = urllib.request.Request(url, data = data.encode('ascii'), method = method)
response = urllib.request.urlopen(request, timeout=10).read()
# etcd may return json result with response http error code
# http error code will raise exception in urlopen
# catch the HTTPError and get the json result
except urllib.error.HTTPError as e:
# e.fp must be read() in this except block.
# the e will be deleted and e.fp will be closed after this block
response = e.fp.read()
# response is encoded in bytes.
# recoded in utf-8 and loaded in json
result = json.loads(str(response, encoding='utf-8'))
return result
# client to use etcd
# not all APIs are implemented below. just implement what we want
python类request()的实例源码
def _validate_captcha(data):
"""
Validates a captcha with google's reCAPTCHA.
Args:
data: the posted form data
"""
settings = api.config.get_settings()["captcha"]
post_data = urllib.parse.urlencode({
"secret": api.config.reCAPTCHA_private_key,
"response": data["g-recaptcha-response"],
"remoteip": flask.request.remote_addr
}).encode("utf-8")
request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
response = urllib.request.urlopen(request).read().decode("utf-8")
parsed_response = json.loads(response)
return parsed_response['success'] == True
def create_request_object(rel_url, req_type, req_to, job_id):
"""
Creates request strings to use for batch_requests,
based on rel_url
type: can be used to determine the type of request when reading
the response
to: can be used to link certain attributes (like 'reactions')
to the post they belong
"""
# print(rel_url)
return {
'req_type': req_type,
'req_to': req_to,
'job_id': job_id,
'req': {
"method": "GET",
"relative_url": "{}".format(rel_url)
}
}
def create_post_request(self, post_id, job_id):
"""
Creates a request string for a post to use in
batch_requests based on post_id
Note: could add limit as well?
"""
return self.create_request_object((
'{}?fields={},{},{},{}').format(
post_id,
self.str_reactions_query(),
self.str_comments_query(),
self.str_sharedposts_query(),
self.str_attachments_query()),
req_type='post',
req_to='',
job_id=job_id)
# @staticmethod
# def encode_batch(batch):
# """
# URL encodes the batch to prepare it for a graph API request
# """
# _json = json.dumps(batch)
# _url = urllib.parse.urlparse(_json)
# return _url
def extend_token(self):
"""
Extends access token and replaces the previously used one
Prints error message if API Key or API Secret not found
TODO: Replace also config file once that file is defined
TODO: Additional checks on the response
"""
if not self.api_key or not self.api_secret:
logging.error('No API Key and/or API Secret defined')
return None
resp = self.request(
req='oauth/access_token?grant_type=fb_exchange_token&client_id={}'
'&client_secret={}&fb_exchange_token={}'.format(
self.api_key, self.api_secret, self.access_token))
msg = json.loads(resp.read().decode('utf-8'))
self.access_token = msg['access_token']
logging.info('Extended Access Token: \n%s', self.access_token)
return self.access_token
sciencedirect_collect.py 文件源码
项目:scientific-paper-summarisation
作者: EdCo95
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def getJournalURL(jname):
# get journal URL given the journal name for retrieving article PIIs
urlstr = "http://api.elsevier.com/sitemap/page/sitemap/" + jname[0].lower() + ".html"
retl = ""
with urllib.request.urlopen(urlstr) as url:
response = url.read()
linkcnt = 0
for link in BeautifulSoup(response, parse_only=SoupStrainer("a")):
if linkcnt == 0:
linkcnt += 1
continue
if link.has_attr("href"):
if link.text.lower() == jname.lower():
#print(link["href"])
retl = link["href"]
break
linkcnt += 1
return retl
def __init__(self, username=None, password=None, version=None, debug=None,
requesttype=None, baseurl=None, site=None):
if username:
self.username = username
if password:
self.password = password
if version:
self.version = version
if debug:
self.debug = debug
if requesttype:
self.requesttype = requesttype
if baseurl:
self.baseurl = baseurl
if site:
self.site = site
ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
self.cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPCookieProcessor(self.cj))
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
def __init__(self, username=None, password=None, debug=None,
requesttype=None, baseurl=None):
if username:
self.username = username
if password:
self.password = password
if debug:
self.debug = debug
if requesttype:
self.requesttype = requesttype
if baseurl:
self.baseurl = baseurl
ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
self.cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPCookieProcessor(self.cj))
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
def request(self, url, data=None, headers=None, method='POST', baseurl = None):
# req = None
headers = headers or {
'Content-type': 'application/json',
'Referer': 'https://account.ubnt.com/login?redirect=https%3A%2F%2Funifi.ubnt.com',
'Origin': 'https://account.ubnt.com',
'dnt': 1
}
if not baseurl:
baseurl = self.baseurl
self.log('Request to %s with data %s' % (baseurl + url, data))
if data:
req = urllib.request.Request(url=baseurl + url, data=json.dumps(data).encode("utf8"), headers=headers, method=method)
else:
req = urllib.request.Request(url=baseurl + url, headers=headers, method='GET')
return urllib.request.urlopen(req)
addon_updater.py 文件源码
项目:Blender-WMO-import-export-scripts
作者: WowDevTools
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_api_raw(self, url):
request = urllib.request.Request(self._api_url + url)
try:
result = urllib.request.urlopen(request)
except urllib.error.HTTPError as e:
self._error = "HTTP error"
self._error_msg = str(e.code)
self._update_ready = None
except urllib.error.URLError as e:
self._error = "URL error, check internet connection"
self._error_msg = str(e.reason)
self._update_ready = None
return None
else:
result_string = result.read()
result.close()
return result_string.decode()
# if we didn't get here, return or raise something else
# result of all api calls, decoded into json format
def onSkeyAvailable(self, request=None ):
"""
New SKey got avaiable
"""
self.isRequesting = False
try:
skey = NetworkService.decode( request )
except:
SecurityTokenProvider.errorCount += 1
self.isRequesting = False
return
if SecurityTokenProvider.errorCount>0:
SecurityTokenProvider.errorCount = 0
self.isRequesting = False
if not skey:
return
try:
self.queue.put( (skey,time.time()), False )
except QFull:
print( "Err: Queue FULL" )
def onFinished(self ):
self.hasFinished = True
if self.request.error()==self.request.NoError:
self.requestSucceeded.emit( self )
else:
try:
errorDescr = NetworkErrorDescrs[ self.request.error() ]
except: #Unknown error
errorDescr = None
if errorDescr:
QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) )
self.requestFailed.emit( self, self.request.error() )
self.finished.emit( self )
self.logger.debug("Request finished: %s", str(self) )
self.logger.debug("Remaining requests: %s", len(NetworkService.currentRequests) )
def _get_movie_url_by_name(movie_name, year=None):
query = SEARCH_URL.format(movie_name=_parse_name_for_search(movie_name))
request = urllib.request.Request(query, headers=_HEADERS)
search_res = bs(urllib.request.urlopen(request), "html.parser")
results = search_res.find_all("li", {"class": "result"})
correct_result = None
for result in results:
title = result.find_all(
"h3", {"class": "product_title"})[0].contents[0].contents[0]
title_match = title.strip().lower() == movie_name.strip().lower()
if year is None and title_match:
correct_result = result
else:
year_match = str(year) in str(result)
if title_match and year_match:
correct_result = result
movie_url_suffix = correct_result.find_all("a")[0]['href']
return METACRITIC_URL + movie_url_suffix
# === critics reviews page ===
def _get_critics_reviews_props(movie_url):
critics_url = movie_url + CRITICS_REVIEWS_URL_SUFFIX
critics_request = urllib.request.Request(critics_url, headers=_HEADERS)
critics_page = bs(urllib.request.urlopen(critics_request), "html.parser")
critics_props = {}
critics_props['metascore'] = int(critics_page.find_all(
"span", {"class": SCORE_CLASSES})[0].contents[0])
critic_reviews = []
for review in critics_page.find_all("div", {"class": "review"}):
try:
critic_reviews.append(_get_critic_review_props(review))
except Exception:
continue
critics_props['pro_critic_reviews'] = critic_reviews
return critics_props
# === user reviews page ===
def _get_user_reviews_from_page(users_page):
review_elements = users_page.find_all("div", {"class": "review"})
user_reviews = []
for review in review_elements:
try:
user_reviews.append(_get_user_review_props(review))
except Exception:
continue
# print("Extracted {} reviews.".format(len(user_reviews)))
nexts = users_page.find_all("a", {"class": "action", "rel": "next"})
if len(nexts) > 0:
next_url = METACRITIC_URL + nexts[0]['href']
next_request = urllib.request.Request(next_url, headers=_HEADERS)
next_page = bs(urllib.request.urlopen(next_request), "html.parser")
user_reviews += _get_user_reviews_from_page(next_page)
return user_reviews
def _get_user_reviews_props(movie_url):
users_url = movie_url + USERS_REVIEWS_URL_SUFFIX
users_request = urllib.request.Request(users_url, headers=_HEADERS)
users_page = bs(urllib.request.urlopen(users_request), "html.parser")
users_props = {}
users_props['movie_name'] = users_page.find_all(
"meta", {"property": "og:title"})[0]['content']
user_score = float(users_page.find_all(
"span", {"class": USER_SCORE_CLASSES})[0].contents[0])
users_props['avg_user_score'] = user_score
for rating in ['positive', 'mixed', 'negative']:
users_props['{}_rating_frequency'.format(
rating)] = _get_user_rating_freq(users_page, rating)
users_props['user_reviews'] = _get_user_reviews_from_page(users_page)
return users_props
# === metacritic crawling ===
def soupit(j,genre):
try:
url = "https://www.wikiart.org/en/paintings-by-genre/"+ genre+ "/" + str(j)
html = urllib.request.urlopen(url)
soup = BeautifulSoup(html)
found = False
urls = []
for i in str(soup.findAll()).split():
if i == 'data':
found = True
if found == True:
if '}];' in i:
break;
if 'https' in i:
web = "http" + i[6:-2]
urls.append(web)
j = j+1
return urls
except Exception as e:
print('Failed to find the following genre page combo: '+genre+str(j))
#Given a url for an image, we download and save the image while also recovering information about the painting in the saved name depending on the length of the file.split('/') information (which corresponds to how much information is available)
def files(self):
if not self._files:
path = '/ajax_details_filelist.php'
url = self.url.path(path).query_param('id', self.id)
request = urllib.request.Request(
url, headers={'User-Agent': "Magic Browser"})
response = urllib.request.urlopen(request).read()
root = html.document_fromstring(response)
rows = root.findall('.//tr')
if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2):
self._files = {}
else:
for row in rows:
name, size = [unicode(v.text_content())
for v in row.findall('.//td')]
self._files[name] = size.replace('\xa0', ' ')
return self._files
def connect(username, password):
global token, userid, files
token = None
userid = None
files = None
token_req = urllib.request.Request(base_url + token_url % (urllib.parse.quote(username, safe=""),
urllib.parse.quote(password, safe="")))
with urllib.request.urlopen(token_req) as response:
result = json.loads(response.readall().decode("utf-8"))
if "errorcode" in result:
raise Exception(result["errorcode"])
token = result["token"]
siteinfo = call_wsfunction("moodle_webservice_get_siteinfo")
userid = siteinfo["userid"]
try:
os.makedirs(download_folder)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(download_folder):
pass
else:
raise
def textanalyze(self,index_name, analyzer, text):
# Create JSON string for request body
reqobject={}
reqobject['text'] = text
reqobject['analyzer'] = analyzer
io=StringIO()
json.dump(reqobject, io)
req_body = io.getvalue()
# HTTP request to Azure search REST API
conn = httplib.HTTPSConnection(self.__api_url)
conn.request("POST",
u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION),
req_body, self.headers)
response = conn.getresponse()
#print "status:", response.status, response.reason
data = (response.read()).decode('utf-8')
#print("data:{}".format(data))
conn.close()
return data
def _validate_captcha(data):
"""
Validates a captcha with google's reCAPTCHA.
Args:
data: the posted form data
"""
settings = api.config.get_settings()["captcha"]
post_data = urllib.parse.urlencode({
"secret": api.config.reCAPTCHA_private_key,
"response": data["g-recaptcha-response"],
"remoteip": flask.request.remote_addr
}).encode("utf-8")
request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
response = urllib.request.urlopen(request).read().decode("utf-8")
parsed_response = json.loads(response)
return parsed_response['success'] == True
def _do_pip_check():
request = urllib.request.Request('https://api.github.com/repos/UAVCAN/gui_tool/tags',
headers={
'Accept': 'application/vnd.github.v3+json',
})
with urllib.request.urlopen(request) as response:
data = response.read()
data = json.loads(data.decode('utf8'), encoding='utf8')
newest_tag_name = data[0]['name']
logger.debug('Newest tag: %r', newest_tag_name)
match = re.match(r'^.*?(\d{1,3})\.(\d{1,3})', newest_tag_name)
version_tuple = int(match.group(1)), int(match.group(2))
logger.debug('Parsed version tuple: %r', version_tuple)
if _version_tuple_to_int(version_tuple) > _version_tuple_to_int(__version__):
git_url = 'https://github.com/UAVCAN/gui_tool'
return 'pip3 install --upgrade git+<a href="{0}">{0}</a>@{1}'.format(git_url, newest_tag_name)
# noinspection PyBroadException
def put_status(self):
host = os.getenv('HOST')
url = "https://{host}/api/task/{id}".format(host=host, id=self.request.id)
payload = {
'status': self.status,
'steps': self.steps,
'file_name': self.zim_file_name,
'time_stamp': {
'started': self.start_time,
'ended': self.ended_time
}
}
headers = {
'Content-Type': 'application/json; charset=utf-8',
'token': self.token
}
request = urllib.request.Request(url, json.dumps(payload, cls=JSONEncoder).encode('utf-8'),
headers, method='PUT')
try:
with urllib.request.urlopen(request) as response:
code = response.code
except HTTPError as error:
code = error.code
def fixUrl(destinationPort, transport, url, peerType):
"""
fixes the URL (original request string)
"""
transportProtocol = ""
if transport.lower() in "udp" or transport.lower() in "tcp":
transportProtocol="/"+transport
if ("honeytrap" in peerType):
return "Attack on port " + str(destinationPort) + transportProtocol
# prepared dionaea to output additional information in ticker
if ("Dionaea" in peerType):
return "Attack on port " + str(destinationPort)+ transportProtocol
return url
def _call_ACIS(self, kwargs, **moreKwargs):
'''
Core method for calling the ACIS services.
Returns python dictionary by de-serializing json response
'''
#self._formatInputDict(**kwargs)
kwargs.update(moreKwargs)
self._input_dict = self._stripNoneValues(kwargs)
self.url = self.baseURL + self.webServiceSource
if pyVersion == 2: #python 2.x
params = urllib.urlencode({'params':json.dumps(self._input_dict)})
request = urllib2.Request(self.url, params, {'Accept':'application/json'})
response = urllib2.urlopen(request)
jsonData = response.read()
elif pyVersion == 3: #python 3.x
params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)})
params = params.encode('utf-8')
req = urllib.request.urlopen(self.url, data = params)
jsonData = req.read().decode()
return json.loads(jsonData)
def _fetch_img_urls(self, keyword, safe_search=False):
# bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799
url = self._get_bing_url(keyword, safe_search=safe_search)
self.logger.debug('search url {}'.format(url))
header = {
'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/43.0.2357.134 Safari/537.36"}
soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser')
imgs = [] # contains the link for Large original images, type of image
for a in soup.find_all("a", {"class": "iusc"}):
mad = json.loads(a["mad"])
turl = mad["turl"]
m = json.loads(a["m"])
murl = m["murl"]
image_name = urllib.parse.urlsplit(murl).path.split("/")[-1]
imgs.append((image_name, turl, murl))
return imgs
def search(query):
request = {
'order': 'desc',
'sort': 'relevance',
'q': query,
'answers': 1,
'site': 'stackoverflow',
'filter': 'withbody',
'pagesize': 1
}
response = send_request("search/advanced", request)
question = response["items"][0]
answer = get_answer(question)
return (question["link"] + "\n\n"
+ "<b>" + question["title"] + "</b>\n\n"
+ format_user_data(question) + "\n\n"
+ "<b>Answer:</b>\n\n"
+ format_user_data(answer) +"\n\n"
+ "? StackOverflow users, CC-BY-SA 3.0")
def getDanmu(cid):
if not cid:
return "?????"
try:
cid_url = "http://comment.bilibili.com/%s.xml" % cid
danmu_xml = urllib.request.urlopen(cid_url).read()
xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode() # ????????
return xml # ?????
except Exception:
pass
# ??xml??????????????
def camouflageBrowser(url):
myHeaders = [
"Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/3.5 Safari/536.11"
] # ?????
try:
content = urllib.request.urlopen(url).read()
return content
except Exception:
pass
# ?P??????
def _validate_captcha(data):
"""
Validates a captcha with google's reCAPTCHA.
Args:
data: the posted form data
"""
post_data = urllib.parse.urlencode({
"secret": api.config.reCAPTCHA_private_key,
"response": data["g-recaptcha-response"],
"remoteip": flask.request.remote_addr
}).encode("utf-8")
request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
response = urllib.request.urlopen(request).read().decode("utf-8")
parsed_response = json.loads(response)
return parsed_response['success'] == True