def get_pypi_src_download(package):
url = 'https://pypi.python.org/pypi/%s/json'%(package,)
fp = urllib.urlopen(url)
try:
try:
data = fp.read()
finally:
fp.close()
except urllib.error:
raise RuntimeError("Cannot determine download link for %s"%(package,))
pkgdata = json.loads(data.decode('utf-8'))
if 'urls' not in pkgdata:
raise RuntimeError("Cannot determine download link for %s"%(package,))
for info in pkgdata['urls']:
if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'):
return (info.get('md5_digest'), info['url'])
raise RuntimeError("Cannot determine downlink link for %s"%(package,))
python类urlopen()的实例源码
def paste(self):
"""Create a paste and return the paste id."""
data = json.dumps({
'description': 'Werkzeug Internal Server Error',
'public': False,
'files': {
'traceback.txt': {
'content': self.plaintext
}
}
}).encode('utf-8')
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
rv = urlopen('https://api.github.com/gists', data=data)
resp = json.loads(rv.read().decode('utf-8'))
rv.close()
return {
'url': resp['html_url'],
'id': resp['id']
}
reachability-monitor.py 文件源码
项目:securedrop-reachability-monitor
作者: freedomofpress
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def read_directory(self, directory_url):
"""Parses the SecureDrop directory into a dictionary of instance
details."""
# CloudFlare will block us if we don't set user-agent
dir_req = Request(directory_url)
dir_req.add_header("User-Agent",
"Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
"Gecko/20100101 Firefox/45.0")
directory = urlopen(dir_req).read().decode()
instances = []
for line in directory.splitlines()[1:-1]:
fields = line.split("\t")
instances.append(dict(organization=fields[0],
landing_page=fields[1],
ths_address=fields[2]))
return instances
def test_post_video(self):
# Reposting https://streamable.com/deltx
video_info_res = urlopen('https://api.streamable.com/videos/deltx')
video_info = json.loads(video_info_res.read().decode('utf8'))
mp4_info = video_info['files']['mp4']
video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
video_size = (mp4_info['width'], mp4_info['height'])
thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
duration = mp4_info['duration']
video_res = urlopen(video_url)
video_data = video_res.read()
thumb_res = urlopen(thumbnail_url)
thumb_data = thumb_res.read()
results = self.api.post_video(video_data, video_size, duration, thumb_data, caption='<3')
self.assertEqual(results.get('status'), 'ok')
self.assertIsNotNone(results.get('media'))
def test_post_video_story(self):
# Reposting https://streamable.com/08ico
video_info_res = urlopen('https://api.streamable.com/videos/08ico')
video_info = json.loads(video_info_res.read().decode('utf8'))
mp4_info = video_info['files']['mp4']
video_url = ('https:' if mp4_info['url'].startswith('//') else '') + mp4_info['url']
video_size = (mp4_info['width'], mp4_info['height'])
thumbnail_url = ('https:' if video_info['thumbnail_url'].startswith('//') else '') + video_info['thumbnail_url']
duration = mp4_info['duration']
video_res = urlopen(video_url)
video_data = video_res.read()
thumb_res = urlopen(thumbnail_url)
thumb_data = thumb_res.read()
results = self.api.post_video_story(video_data, video_size, duration, thumb_data)
self.assertEqual(results.get('status'), 'ok')
self.assertIsNotNone(results.get('media'))
def respond_to_checkpoint(self, response_code):
headers = {
'User-Agent': self.USER_AGENT,
'Origin': 'https://i.instagram.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip',
'Referer': self.endpoint,
'Cookie': self.cookie,
}
req = Request(self.endpoint, headers=headers)
data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)
if res.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(res.read())
content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
else:
content = res.read().decode('utf-8')
return res.code, content
def public_ip(self):
ip_regex = re.compile("(([0-9]{1,3}\.){3}[0-9]{1,3})")
# List of host which return the public IP address:
hosts = """http://www.lawrencegoetz.com/programs/ipinfo/
http://mwburden.com/cgi-bin/getipaddr
http://checkip.eurodyndns.org/
http://checkip.dyndns.org/
http://checkrealip.com/
http://adresseip.com
http://www.ipchicken.com/
http://checkmyip.com/
http://www.naumann-net.org/""".split("\n")
for i in hosts:
host = i.strip()
#print(host)
try:
response = request.urlopen(host).read()
result = ip_regex.findall(response.decode('utf-8'))
if result:
return result[0][0]
except:
pass
return "UNKNOWN"
def _woxikon_de_url_handler(target):
'''
Query woxikon for sysnonym
'''
time_out_choice = float(get_variable(
'tq_online_backends_timeout', _timeout_period_default))
try:
response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
web_content = StringIO(unescape(decode_utf_8(response.read())))
response.close()
except HTTPError:
return 1
except URLError as err:
if isinstance(err.reason, socket.timeout): # timeout error?
return 1
return -1 # other error
except socket.timeout: # timeout error failed to be captured by URLError
return 1
return web_content
def _jeck_ru_url_handler(target):
'''
Query jiport for sysnonym
'''
time_out_choice = float(get_variable(
'tq_online_backends_timeout', _timeout_period_default))
try:
response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
web_content = StringIO(decode_utf_8(response.read()))
response.close()
except HTTPError:
return 1
except URLError as err:
if isinstance(err.reason, socket.timeout): # timeout error?
return 1
return -1 # any other error
except socket.timeout: # if timeout error not captured by URLError
return 1
return web_content
def getLinks(pageUrl):
global pages
html=urlopen("http://en.wikipedia.org"+pageUrl)
bs=BeautifulSoup(html,"html.parser")
try:
print(bs.h1.get_text())
print(bs.find(id="mw-content-text").findAll("p")[0])
print(bs.find(id="ca-edit").find("span").find("a").attrs['href'])
except AttributeError:
print("????????")
for link in bs.findAll("a",href=re.compile("^(/wiki/)")):
if 'href' in link.attrs:
if link.attrs['href'] not in pages:
newpage=link.attrs["href"]
print("---------\n"+newpage)
pages.add(newpage)
getLinks(newpage)
def mm_heartbeat(self):
# Check if stop or set next timer
if self.shutdown:
return
threading.Timer(self.hb_timer, self.mm_heartbeat).start()
address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB")
logger.debug("Sending MM Heatbeat")
try:
response = urlopen(address).read()
except URLError as err:
logger.error("URLError: %s", err.reason)
return
logger.debug("Response: " + response)
def query_usage(request):
"""
This function should only be called when the user is using IE8 or IE9
:param request:
:return:
"""
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
api_bus = get_config('program', 'api', 1)+'/Kb/findSoftwareUsage?software='+request.POST['software']
try:
res_data = urlopen(api_bus)
res = res_data.read()
return HttpResponse(res)
except Exception as e:
return error(api_bus)
def refresh(self):
try:
#open the data url
self.req = urlopen(self.data_url)
#read data from the url
self.raw_data = self.req.read()
#load in the json
self.json_data = json.loads(self.raw_data.decode())
#get time from json
self.time = datetime.fromtimestamp(self.parser.time(self.json_data))
#load all the aircarft
self.aircraft = self.parser.aircraft_data(self.json_data, self.time)
except Exception:
print("exception in FlightData.refresh():")
traceback.print_exc()
def paste(self):
"""Create a paste and return the paste id."""
data = json.dumps({
'description': 'Werkzeug Internal Server Error',
'public': False,
'files': {
'traceback.txt': {
'content': self.plaintext
}
}
}).encode('utf-8')
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
rv = urlopen('https://api.github.com/gists', data=data)
resp = json.loads(rv.read().decode('utf-8'))
rv.close()
return {
'url': resp['html_url'],
'id': resp['id']
}
def download(dest_path, url):
try:
file_name = url.split('/')[-1]
path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
if not os.path.exists(path):
f = urlopen(url)
headers = f.headers['content-type'].split('/')
md = 'w'
if 'html' in headers:
file_name = '{}.html'.format(uuid.uuid1())
else:
md = 'wb'
with open(path, md) as local_file:
local_file.write(f.read())
if os.path.exists(path):
return path
else:
logger.info("Wasn't able to find the file....!")
return None
except Exception as error:
logger.error('download error %s', error)
def query(location, cty_codes, query_method, fuzzy):
results = []
try:
base_url = get_geonames_base_url()
username = get_geonames_user_name()
query_string = base_url + 'username={user}&{query_method}={name}&' \
'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \
.format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy)
if cty_codes and len(cty_codes) > 0:
query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes])
json_decode = json.JSONDecoder() # used to parse json response
response = urlopen(query_string)
response_string = response.read().decode('utf-8')
parsed_response = json_decode.decode(response_string)
if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0:
for item in parsed_response['geonames']:
results.append(parse(item))
except URLError as e:
logger.info("Oops! something didn't go well")
logger.info(e)
return results
def fetch_xml(url):
with request.urlopen(url) as f:
print('Status:', f.status, f.reason)
for k, v in f.getheaders():
print('%s: %s' % (k, v))
html = f.read().decode('utf-8')
pattern_one = re.compile(r'<yweather:location.*?city="(.*?)".*?country="(.*?)".*?region="(.*?)".*?/>', re.S)
pattern_two = re.compile(r'<yweather:forecast.*?date="(.*?)".*?day="(.*?)".*?high="(.*?)".*?low="(.*?)".*?text="(.*?)".*?/>', re.S)
location_info = re.findall(pattern_one, html)
items = re.findall(pattern_two, html)
weather = {}
weather['city'] = location_info[0][0]
weather['country'] = location_info[0][1]
weather['region'] = location_info[0][2]
for item in items:
weather[item[1]] = {}
weather[item[1]]['data'] = item[0]
weather[item[1]]['high'] = item[2]
weather[item[1]]['low'] = item[3]
weather[item[1]]['text'] = item[4]
return weather
def download_image(image_id, url, x1, y1, x2, y2, output_dir):
"""Downloads one image, crops it, resizes it and saves it locally."""
output_filename = os.path.join(output_dir, image_id + '.png')
if os.path.exists(output_filename):
# Don't download image if it's already there
return True
try:
# Download image
url_file = urlopen(url)
if url_file.getcode() != 200:
return False
image_buffer = url_file.read()
# Crop, resize and save image
image = Image.open(BytesIO(image_buffer)).convert('RGB')
w = image.size[0]
h = image.size[1]
image = image.crop((int(x1 * w), int(y1 * h), int(x2 * w),
int(y2 * h)))
image = image.resize((299, 299), resample=Image.ANTIALIAS)
image.save(output_filename)
except IOError:
return False
return True
def getSoup(start, stop):
try:
for number in range(start, stop+1):
# http://space.bilibili.com/15989779/#!/
url = 'http://space.bilibili.com/' + str(number) + '/#!/'
response = request.urlopen(url)
# print(response.getcode())
html_cont = response.read()
soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
username = soup.find("h1").get_text().strip()[:-6] # ?????
uid = number # number??uid
get_gz_uid = GetFollowUid(number)
gzsuid, gznumber = get_gz_uid.get_uids() # ????id?????
saveData(uid, username, gznumber, gzsuid) # ?????
except Exception:
print("get page error")
return getSoup(number+1, stop+1)
# ????
def getSoup(start, stop):
try:
for number in range(start, stop + 1):
# http://space.bilibili.com/15989779/#!/
url = 'http://space.bilibili.com/'+str(number)+'/#!/'
response = request.urlopen(url)
# print(response.getcode())
html_cont = response.read()
soup = BeautifulSoup(html_cont, 'lxml', from_encoding='utf-8')
username = soup.find("h1").get_text().strip()[:-6] # ?????
uid = number # number??uid
get_fans_uid = GetFansUid(number)
fansuid, fansnumber = get_fans_uid.get_uids() # ????id?????
saveData(uid, username, fansnumber, fansuid) # ?????
except Exception:
print("get page error")
return getSoup(number + 1, stop + 1)
# ????