def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
python类urlopen()的实例源码
def workthread(item, user_agent,path):
strurl = 'http://yxpjw.club'+item[0]
picname = item[1]
print('????%s...........................\n' %(picname))
req = request.Request(strurl)
req.add_header('User-Agent',user_agent)
response = request.urlopen(req)
content = response.read().decode('gbk')
strurl2 = re.search(r'^(.*)/',strurl).group(0)
print('https headers...............%s'%(strurl2))
#destname = os.path.join(path,picname+'.txt')
#with open(destname, 'w',encoding='gbk') as file:
#file.write(content)
destdir = os.path.join(path,picname)
os.makedirs(destdir)
page = 1
while(1):
content = getpagedata(content,destdir,page,strurl2)
if not content:
break
page = page + 1
print('%s?????????\n'%(picname))
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def requestData(url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('gbk')
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
return content
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('utf-8')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 3)
#bytes?????
content = response.read().decode('gbk')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def getAbstractInfo(self):
try:
req = request.Request(self.url)
req.add_header('User-Agent', self.user_agent)
response = request.urlopen(req)
#bytes?????
content = response.read().decode('gbk')
self.getDetailList(content)
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
print('HTTPError!!!')
def requestData(url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 8)
#bytes?????
content = response.read().decode('gbk')
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
return content
def requestData(self,url, user_agent):
try:
req = request.Request(url)
req.add_header('User-Agent', user_agent)
response = request.urlopen(req,timeout = 3)
#bytes?????
content = response.read().decode('gbk')
return content
except error.URLError as e:
if hasattr(e,'code'):
print (e.code)
if hasattr(e,'reason'):
print (e.reason)
except error.HTTPError as e:
if hasattr(e,'code'):
print(e.code)
if hasattr(e,'reason'):
print(e.reason)
print('HTTPError!!!')
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib.request import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def paste(self):
"""Create a paste and return the paste id."""
data = json.dumps({
'description': 'Werkzeug Internal Server Error',
'public': False,
'files': {
'traceback.txt': {
'content': self.plaintext
}
}
}).encode('utf-8')
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
rv = urlopen('https://api.github.com/gists', data=data)
resp = json.loads(rv.read().decode('utf-8'))
rv.close()
return {
'url': resp['html_url'],
'id': resp['id']
}
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def download_archive(self, src, dst):
for x in self.env.PACKAGE_REPO:
url = '/'.join((x, src))
try:
web = urlopen(url)
try:
if web.getcode() != 200:
continue
except AttributeError:
pass
except Exception:
# on python3 urlopen throws an exception
# python 2.3 does not have getcode and throws an exception to fail
continue
else:
tmp = self.root.make_node(dst)
tmp.write(web.read())
Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
break
else:
self.fatal('Could not get the package %s' % src)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def download_archive(self, src, dst):
for x in self.env.PACKAGE_REPO:
url = '/'.join((x, src))
try:
web = urlopen(url)
try:
if web.getcode() != 200:
continue
except AttributeError:
pass
except Exception:
# on python3 urlopen throws an exception
# python 2.3 does not have getcode and throws an exception to fail
continue
else:
tmp = self.root.make_node(dst)
tmp.write(web.read())
Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
break
else:
self.fatal('Could not get the package %s' % src)
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def download_archive(self, src, dst):
for x in self.env.PACKAGE_REPO:
url = '/'.join((x, src))
try:
web = urlopen(url)
try:
if web.getcode() != 200:
continue
except AttributeError:
pass
except Exception:
# on python3 urlopen throws an exception
# python 2.3 does not have getcode and throws an exception to fail
continue
else:
tmp = self.root.make_node(dst)
tmp.write(web.read())
Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
break
else:
self.fatal('Could not get the package %s' % src)
def get_assembly_report(self, taxid):
if self.ass_sum is None:
self.get_assembly_summaries()
df = self.ass_sum.query("taxid == {} & refseq_category == 'reference genome'".format(taxid))
if len(df) == 0:
# try "representative genome" (needed for mouse and rat)
df = self.ass_sum.query("taxid == {} & refseq_category == 'representative genome'".format(taxid))
if len(df) != 1:
raise ValueError("unknown reference: {}".format(df))
print(df)
ftp_path = list(df.ftp_path)[0]
assembly = os.path.split(ftp_path)[1]
url = os.path.join(ftp_path, assembly + "_assembly_report.txt")
print(url)
# read the column names from the file
table = request.urlopen(request.Request(url)).read().decode()
names = [x for x in table.split("\n") if x.startswith("#")][-1].strip().replace("# ", "").split("\t")
self.chr_df[taxid] = pd.read_csv(StringIO(table), sep="\t", names=names, comment='#')
self.chr_df[taxid] = self.chr_df[taxid].rename(columns={'Sequence-Name': 'SequenceName', 'Sequence-Role': 'SequenceRole',
'Assigned-Molecule': 'AssignedMolecule',
'Assigned-Molecule-Location/Type': 'AssignedMoleculeLocationType',
'GenBank-Accn': 'GenBankAccn', 'RefSeq-Accn': 'RefSeqAccn',
'UCSC-style-name': 'UCSCstylename'})
#print(self.chr_df[taxid].query("SequenceRole == 'assembled-molecule'"))
def _get_business_props(movie_code):
cur_business_url = _BUSINESS_URL.format(code=movie_code)
busi_page = bs(request.urlopen(cur_business_url), "html.parser")
busi_str = str(busi_page)
weekend_contents = re.findall(_WEEKEND_CONTENT_REGEX, busi_str)[0]
num_screens_list = [
int(match.replace(',', ''))
for match in re.findall(_US_OPEN_WEEKEND_REGEX, weekend_contents)]
busi_props = {}
busi_props['screens_by_weekend'] = [
val for val in reversed(num_screens_list)]
busi_props['opening_weekend_screens'] = busi_props['screens_by_weekend'][0]
busi_props['max_screens'] = max(num_screens_list)
busi_props['total_screens'] = sum(num_screens_list)
busi_props['avg_screens'] = sum(num_screens_list) / len(num_screens_list)
busi_props['num_weekends'] = len(num_screens_list)
return busi_props
# ==== crawling the release page ====
def _get_release_props(movie_code):
cur_release_url = _RELEASE_URL.format(code=movie_code)
release_page = bs(urllib.request.urlopen(cur_release_url), "html.parser")
release_table = release_page.find_all("table", {"id": "release_dates"})[0]
us_rows = []
for row in release_table.find_all("tr")[1:]:
row_str = str(row)
if 'USA' in row_str:
us_rows.append(row_str)
release_props = {}
release_props['release_day'] = None
release_props['release_month'] = None
release_props['release_year'] = None
for row in us_rows:
if re.match(_USA_ROW_REGEX, row):
release = re.findall(_USA_ROW_REGEX, row)[0]
release_props['release_day'] = int(release[0])
release_props['release_month'] = release[1]
release_props['release_year'] = int(release[2])
return release_props
# ==== crawling the user reviews page ====
def _get_reviews_props(movie_code):
cur_reviews_url = _REVIEWS_URL.format(code=movie_code)
reviews_page = bs(urllib.request.urlopen(cur_reviews_url), "html.parser")
reviews = reviews_page.find_all("td", {"class": "comment-summary"})
user_reviews = []
for review in reviews:
try:
rating = int(re.findall(_USER_REVIEW_RATING_REGEX, str(review))[0])
date_str = re.findall(
r"on (\d{1,2} [a-zA-Z]+ \d{4})", str(review))[0]
date = datetime.strptime(date_str, "%d %B %Y").date()
contents = review.find_all(
'a', href=re.compile(r'reviews.+?'))[0].contents[0]
user = review.find_all(
'a', href=re.compile(r'/user/.+?'))[1].contents[0]
user_reviews.append({
'score': rating, 'review_date': date,
'contents': contents, 'user': user
})
except Exception: # pylint: disable=W0703
pass
return {'imdb_user_reviews': user_reviews}
# ==== crawling a movie profile ====
def run(self):
request = self.request
try:
if ((timeit.default_timer() - self.starttime) <= self.timeout and
not SHUTDOWN_EVENT.isSet()):
try:
f = urlopen(request)
except TypeError:
# PY24 expects a string or buffer
# This also causes issues with Ctrl-C, but we will concede
# for the moment that Ctrl-C on PY24 isn't immediate
request = build_request(self.request.get_full_url(),
data=request.data.read(self.size))
f = urlopen(request)
f.read(11)
f.close()
self.result = sum(self.request.data.total)
else:
self.result = 0
except (IOError, SpeedtestUploadTimeout):
self.result = sum(self.request.data.total)
def paste(self):
"""Create a paste and return the paste id."""
data = json.dumps({
'description': 'Werkzeug Internal Server Error',
'public': False,
'files': {
'traceback.txt': {
'content': self.plaintext
}
}
}).encode('utf-8')
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
rv = urlopen('https://api.github.com/gists', data=data)
resp = json.loads(rv.read().decode('utf-8'))
rv.close()
return {
'url': resp['html_url'],
'id': resp['id']
}
def list_archive_timestamps(url, min_date, max_date, user_agent):
"""
List the available archive between min_date and max_date for the given URL
"""
logger.info('Listing the archives for the url {url}'.format(url=url))
# Construct the URL used to download the memento list
parameters = {'url': url,
'output': 'json',
'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))
req = Request(cdx_url, None, {'User-Agent': user_agent})
with urlopen(req) as cdx:
memento_json = cdx.read().decode("utf-8")
timestamps = []
# Ignore the first line which contains column names
for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
# Ignore archives with a status code != OK
if status_code == '200':
timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))
return timestamps
def fetch_file(self, url, filename):
# if not os.path.exists(filename):
# os.makedirs(filename)
try:
req = request.Request(url, headers=self.__headers)
data = request.urlopen(req).read()
with open(filename, 'wb') as f:
f.write(data)
f.flush()
f.close()
self.__url_manager.set_url_status(url, 2)
except Exception as e:
self.__url_manager.set_url_status(url, -1)
raise e
finally:
time.sleep(config['basic']['sleep'])
def retrieve_json(self,url):
'''
Retrieve data from the Veneer service at the given url path.
url: Path to required resource, relative to the root of the Veneer service.
'''
if PRINT_URLS:
print("*** %s ***" % (url))
if self.protocol=='file':
text = open(self.prefix+url+self.data_ext).read()
else:
conn = hc.HTTPConnection(self.host,port=self.port)
conn.request('GET',quote(url+self.data_ext))
resp = conn.getresponse()
text = resp.read().decode('utf-8')
#text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')
text = self._replace_inf(text)
if PRINT_ALL:
print(json.loads(text))
print("")
return json.loads(text)
def retrieve_csv(self,url):
'''
Retrieve data from the Veneer service, at the given url path, in CSV format.
url: Path to required resource, relative to the root of the Veneer service.
NOTE: CSV responses are currently only available for time series results
'''
if PRINT_URLS:
print("*** %s ***" % (url))
req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
text = urlopen(req).read().decode('utf-8')
result = utils.read_veneer_csv(text)
if PRINT_ALL:
print(result)
print("")
return result
def retrieve_json(self,url,**kwargs):
if self.print_urls:
print("*** %s ***" % (url))
try:
text = urlopen(self.base_url + quote(url)).read().decode('utf-8')
except:
self.log("Couldn't retrieve %s"%url)
return None
self.save_data(url[1:],bytes(text,'utf-8'),"json")
if self.print_all:
print(json.loads(text))
print("")
return json.loads(text)