def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
if url is None:
return None
try:
req = request.Request(url, headers=headers, data=data)
cookie = cookiejar.CookieJar()
cookie_process = request.HTTPCookieProcessor(cookie)
opener = request.build_opener()
if proxy:
proxies = {urlparse(url).scheme: proxy}
opener.add_handler(request.ProxyHandler(proxies))
content = opener.open(req).read()
except error.URLError as e:
print('HtmlDownLoader download error:', e.reason)
content = None
if retry_count > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
#??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
return self.download(url, retry_count-1, headers, proxy, data)
return content
python类Request()的实例源码
def get_cpi():
"""
????????????
Return
--------
DataFrame
month :????
cpi :????
"""
rdint = vs.random()
request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
rdint, vs.MACRO_TYPE[1], 0, 600,
rdint))
text = urlopen(request,timeout=10).read()
text = text.decode('gbk') if ct.PY3 else text
regSym = re.compile(r'\,count:(.*?)\}')
datastr = regSym.findall(text)
datastr = datastr[0]
datastr = datastr.split('data:')[1]
js = json.loads(datastr)
df = pd.DataFrame(js, columns=vs.CPI_COLS)
df['cpi'] = df['cpi'].astype(float)
return df
def workthread(item, user_agent,path):
strurl = 'http://yxpjw.club'+item[0]
picname = item[1]
print('????%s...........................\n' %(picname))
req = request.Request(strurl)
req.add_header('User-Agent',user_agent)
response = request.urlopen(req)
content = response.read().decode('gbk')
strurl2 = re.search(r'^(.*)/',strurl).group(0)
print('https headers...............%s'%(strurl2))
#destname = os.path.join(path,picname+'.txt')
#with open(destname, 'w',encoding='gbk') as file:
#file.write(content)
destdir = os.path.join(path,picname)
os.makedirs(destdir)
page = 1
while(1):
content = getpagedata(content,destdir,page,strurl2)
if not content:
break
page = page + 1
print('%s?????????\n'%(picname))
def get_loan_rate():
"""
????????
Return
--------
DataFrame
date :????
loan_type :????
rate:???%?
"""
rdint = vs.random()
request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
rdint, vs.MACRO_TYPE[2], 3, 800,
rdint))
text = urlopen(request, timeout=10).read()
text = text.decode('gbk')
regSym = re.compile(r'\,count:(.*?)\}')
datastr = regSym.findall(text)
datastr = datastr[0]
datastr = datastr.split('data:')[1]
js = json.loads(datastr)
df = pd.DataFrame(js, columns=vs.LOAN_COLS)
for i in df.columns:
df[i] = df[i].apply(lambda x:np.where(x is None, '--', x))
return df
def requestData(url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('gbk')
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
return content
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('utf-8')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 3)
#bytes?????
content = response.read().decode('gbk')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def getAbstractInfo(self):
try:
req = request.Request(self.url)
req.add_header('User-Agent', self.user_agent)
response = request.urlopen(req)
#bytes?????
content = response.read().decode('gbk')
self.getDetailList(content)
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
print('HTTPError!!!')
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('utf-8')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def requestData(url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('gbk')
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
return content
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 3)
#bytes?????
content = response.read().decode('gbk')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def getAbstractInfo(self):
try:
req = request.Request(self.url)
req.add_header('User-Agent', self.user_agent)
response = request.urlopen(req)
#bytes?????
content = response.read().decode('gbk')
self.getDetailList(content)
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
print('HTTPError!!!')
def test_download_and_verify_ok(self, mock_urlopen):
mock_extract_tarball = self.mock_patch_object(
self.glance.utils, 'extract_tarball')
mock_md5 = mock.Mock()
mock_md5.hexdigest.return_value = 'expect_cksum'
mock_md5_new = self.mock_patch_object(
self.glance.md5, 'new', mock_md5)
mock_info = mock.Mock()
mock_info.getheader.return_value = 'expect_cksum'
mock_urlopen.return_value.info.return_value = mock_info
fake_request = urllib2.Request('http://fakeurl.com')
self.glance._download_tarball_and_verify(
fake_request, 'fake_staging_path')
mock_urlopen.assert_called_with(fake_request)
mock_extract_tarball.assert_called_once()
mock_md5_new.assert_called_once()
mock_info.getheader.assert_called_once()
mock_md5_new.return_value.hexdigest.assert_called_once()
def test_download_ok_verify_failed(self, mock_urlopen):
mock_extract_tarball = self.mock_patch_object(
self.glance.utils, 'extract_tarball')
mock_md5 = mock.Mock()
mock_md5.hexdigest.return_value = 'unexpect_cksum'
mock_md5_new = self.mock_patch_object(
self.glance.md5, 'new', mock_md5)
mock_info = mock.Mock()
mock_info.getheader.return_value = 'expect_cksum'
mock_urlopen.return_value.info.return_value = mock_info
fake_request = urllib2.Request('http://fakeurl.com')
self.assertRaises(self.glance.RetryableError,
self.glance._download_tarball_and_verify,
fake_request, 'fake_staging_path'
)
mock_urlopen.assert_called_with(fake_request)
mock_extract_tarball.assert_called_once()
mock_md5_new.assert_called_once()
mock_md5_new.return_value.hexdigest.assert_called_once()
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def list_archive_timestamps(url, min_date, max_date, user_agent):
"""
List the available archive between min_date and max_date for the given URL
"""
logger.info('Listing the archives for the url {url}'.format(url=url))
# Construct the URL used to download the memento list
parameters = {'url': url,
'output': 'json',
'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))
req = Request(cdx_url, None, {'User-Agent': user_agent})
with urlopen(req) as cdx:
memento_json = cdx.read().decode("utf-8")
timestamps = []
# Ignore the first line which contains column names
for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
# Ignore archives with a status code != OK
if status_code == '200':
timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))
return timestamps
def fetch_file(self, url, filename):
# if not os.path.exists(filename):
# os.makedirs(filename)
try:
req = request.Request(url, headers=self.__headers)
data = request.urlopen(req).read()
with open(filename, 'wb') as f:
f.write(data)
f.flush()
f.close()
self.__url_manager.set_url_status(url, 2)
except Exception as e:
self.__url_manager.set_url_status(url, -1)
raise e
finally:
time.sleep(config['basic']['sleep'])
def retrieve_csv(self,url):
'''
Retrieve data from the Veneer service, at the given url path, in CSV format.
url: Path to required resource, relative to the root of the Veneer service.
NOTE: CSV responses are currently only available for time series results
'''
if PRINT_URLS:
print("*** %s ***" % (url))
req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
text = urlopen(req).read().decode('utf-8')
result = utils.read_veneer_csv(text)
if PRINT_ALL:
print(result)
print("")
return result
reachability-monitor.py 文件源码
项目:securedrop-reachability-monitor
作者: freedomofpress
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def read_directory(self, directory_url):
"""Parses the SecureDrop directory into a dictionary of instance
details."""
# CloudFlare will block us if we don't set user-agent
dir_req = Request(directory_url)
dir_req.add_header("User-Agent",
"Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
"Gecko/20100101 Firefox/45.0")
directory = urlopen(dir_req).read().decode()
instances = []
for line in directory.splitlines()[1:-1]:
fields = line.split("\t")
instances.append(dict(organization=fields[0],
landing_page=fields[1],
ths_address=fields[2]))
return instances
def respond_to_checkpoint(self, response_code):
headers = {
'User-Agent': self.USER_AGENT,
'Origin': 'https://i.instagram.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip',
'Referer': self.endpoint,
'Cookie': self.cookie,
}
req = Request(self.endpoint, headers=headers)
data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)
if res.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(res.read())
content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
else:
content = res.read().decode('utf-8')
return res.code, content
def add_uri(self) -> None:
user, passwd = '', ''
if len(self.rpc_username) > 0 and len(self.rpc_password) > 0:
user = self.rpc_username
passwd = self.rpc_password
elif len(self.rpc_secret) > 0:
user = 'token'
passwd = self.rpc_secret
aria2_endpoint = '%s:%s/jsonrpc' % (self.rpc_host, self.rpc_port)
headers = {'Content-Type': 'application/json'}
payload = json.dumps({'jsonrpc': '2.0', 'id': 1, 'method': 'aria2.addUri',
'params': ['%s:%s' % (user, passwd), [self.link_url]]},
sort_keys=False).encode('utf-8')
try:
req = Request(aria2_endpoint, headers=headers, data=payload)
res = urlopen(req).read().decode('utf-8')
jsonres = json.loads(res)
# res = requests.post(aria2_endpoint, headers=headers, data=payload)
# jsonres = res.json()
self.aria2Confirmation.emit('result' in jsonres.keys())
except HTTPError:
print(sys.exc_info())
QMessageBox.critical(self, 'ERROR NOTIFICATION', sys.exc_info(), QMessageBox.Ok)
self.aria2Confirmation.emit(False)
# self.exit()
def __init__(self, url=None):
self.url = url
self.html = None
self.links = []
self.soup = None
self.text = None
self.title = None
req = Request(self.url, headers={'User-Agent': "Magic Browser"})
try:
self.html = urlopen(req)
except URLError as e:
if hasattr(e, 'reason'):
print('We failed to reach a server.')
print('Reason: ', e.reason)
elif hasattr(e, 'code'):
print('The server couldn\'t fulfill the request.')
print('Error code: ', e.code)
def _day_cinema(date=None, pNo=1, retry_count=3, pause=0.001):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(ct.BOXOFFICE_CBD%(ct.P_TYPE['http'], ct.DOMAINS['mbox'],
ct.BOX, pNo, date))
lines = urlopen(request, timeout = 10).read()
if len(lines) < 15: #no data
return None
except Exception as e:
print(e)
else:
js = json.loads(lines.decode('utf-8') if ct.PY3 else lines)
df = pd.DataFrame(js['data1'])
df = df.drop(['CinemaID'], axis=1)
return df
def _get_detail(tag, retry_count=3, pause=0.001):
for _ in range(retry_count):
time.sleep(pause)
try:
ct._write_console()
request = Request(ct.SINA_DATA_DETAIL_URL%(ct.P_TYPE['http'],
ct.DOMAINS['vsf'], ct.PAGES['jv'],
tag))
text = urlopen(request, timeout=10).read()
text = text.decode('gbk')
except _network_error_classes:
pass
else:
reg = re.compile(r'\,(.*?)\:')
text = reg.sub(r',"\1":', text)
text = text.replace('"{symbol', '{"symbol')
text = text.replace('{symbol', '{"symbol"')
jstr = json.dumps(text)
js = json.loads(jstr)
df = pd.DataFrame(pd.read_json(js, dtype={'code':object}), columns=ct.THE_FIELDS)
df = df[ct.FOR_CLASSIFY_B_COLS]
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _sz_hz(date='', retry_count=3, pause=0.001):
for _ in range(retry_count):
time.sleep(pause)
ct._write_console()
try:
request = Request(rv.MAR_SZ_HZ_URL%(ct.P_TYPE['http'], ct.DOMAINS['szse'],
ct.PAGES['szsefc'], date))
lines = urlopen(request, timeout = 10).read()
if len(lines) <= 200:
return pd.DataFrame()
df = pd.read_html(lines, skiprows=[0])[0]
df.columns = rv.MAR_SZ_HZ_COLS
df['opDate'] = date
except Exception as e:
print(e)
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _parase_fq_factor(code, start, end):
symbol = _code_to_symbol(code)
request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'],
ct.DOMAINS['vsf'], symbol))
text = urlopen(request, timeout=10).read()
text = text[1:len(text)-1]
text = text.decode('utf-8') if ct.PY3 else text
text = text.replace('{_', '{"')
text = text.replace('total', '"total"')
text = text.replace('data', '"data"')
text = text.replace(':"', '":"')
text = text.replace('",_', '","')
text = text.replace('_', '-')
text = json.loads(text)
df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())})
df['date'] = df['date'].map(_fun_except) # for null case
if df['date'].dtypes == np.object:
df['date'] = df['date'].astype(np.datetime64)
df = df.drop_duplicates('date')
df['factor'] = df['factor'].astype(float)
return df