def getFileURLs(file_ids):
'''
Retrieve the ftp location for a list of file IDs
@param file_ids: List of file IDs
@return List of ftp locations
'''
info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds='
for file_id in file_ids:
info_url += str(file_id) + ','
info_url = info_url[:-1]
url = urlopen(info_url)
tree = ET.fromstring(url.read().decode())
url.close()
return [ child.text for child in tree ]
python类urlopen()的实例源码
def download_sifts_xml(pdb_id, outdir='', outfile=''):
"""Download the SIFTS file for a PDB ID.
Args:
pdb_id:
outdir:
outfile:
Returns:
"""
baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/'
filename = '{}.xml.gz'.format(pdb_id)
if outfile:
outfile = op.join(outdir, outfile)
else:
outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml')
if not op.exists(outfile):
response = urlopen(baseURL + filename)
with open(outfile, 'wb') as f:
f.write(gzip.decompress(response.read()))
return outfile
def get_template_files(template_data=None, template_url=None):
if template_data:
tpl = template_data
elif template_url:
with contextlib.closing(request.urlopen(template_url)) as u:
tpl = u.read()
else:
return {}, None
if not tpl:
return {}, None
if isinstance(tpl, six.binary_type):
tpl = tpl.decode('utf-8')
template = template_format.parse(tpl)
files = {}
_get_file_contents(template, files)
return files, template
def urlretrieve(url, filename, reporthook=None, data=None):
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
count += 1
if not chunk:
reporthook(count, total_size, total_size)
break
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def get_contents_if_file(contents_or_file_name):
"""Get the contents of a file.
If the value passed in is a file name or file URI, return the
contents. If not, or there is an error reading the file contents,
return the value passed in as the contents.
For example, a workflow definition will be returned if either the
workflow definition file name, or file URI are passed in, or the
actual workflow definition itself is passed in.
"""
try:
if parse.urlparse(contents_or_file_name).scheme:
definition_url = contents_or_file_name
else:
path = os.path.abspath(contents_or_file_name)
definition_url = parse.urljoin(
'file:',
request.pathname2url(path)
)
return request.urlopen(definition_url).read().decode('utf8')
except Exception:
return contents_or_file_name
def test_ClientRedirectServer(self):
# create a ClientRedirectServer and run it in a thread to listen
# for a mock GET request with the access token
# the server should return a 200 message and store the token
httpd = tools.ClientRedirectServer(('localhost', 0),
tools.ClientRedirectHandler)
code = 'foo'
url = 'http://localhost:{0}?code={1}'.format(
httpd.server_address[1], code)
t = threading.Thread(target=httpd.handle_request)
t.setDaemon(True)
t.start()
f = request.urlopen(url)
self.assertTrue(f.read())
t.join()
httpd.server_close()
self.assertEqual(httpd.query_params.get('code'), code)
def test_ClientRedirectServer(self):
# create a ClientRedirectServer and run it in a thread to listen
# for a mock GET request with the access token
# the server should return a 200 message and store the token
httpd = tools.ClientRedirectServer(('localhost', 0),
tools.ClientRedirectHandler)
code = 'foo'
url = 'http://localhost:{0}?code={1}'.format(
httpd.server_address[1], code)
t = threading.Thread(target=httpd.handle_request)
t.setDaemon(True)
t.start()
f = request.urlopen(url)
self.assertTrue(f.read())
t.join()
httpd.server_close()
self.assertEqual(httpd.query_params.get('code'), code)
def import_libs():
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import hashlib
import os
import shutil
import sys
import tarfile
import zipfile
import six
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
from six.moves.urllib.request import urlopen
from tensorflow.contrib.keras.python.keras.utils.generic_utils import Progbar
def urlretrieve(url, filename, reporthook=None, data=None):
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
count += 1
if not chunk:
reporthook(count, total_size, total_size)
break
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def _make_tide_request(city, date):
station = STATIONS.get(city.lower())
noaa_api_params = {
'station': station,
'product': 'predictions',
'datum': 'MLLW',
'units': 'english',
'time_zone': 'lst_ldt',
'format': 'json'
}
if date == datetime.date.today():
noaa_api_params['date'] = 'today'
else:
noaa_api_params['begin_date'] = date.strftime('%Y%m%d')
noaa_api_params['range'] = 24
url = ENDPOINT + "?" + urlencode(noaa_api_params)
resp_body = urlopen(url).read()
if len(resp_body) == 0:
statement_text = render_template('noaa_problem')
else:
noaa_response_obj = json.loads(resp_body)
predictions = noaa_response_obj['predictions']
tideinfo = _find_tide_info(predictions)
statement_text = render_template('tide_info', date=date, city=city, tideinfo=tideinfo)
return statement(statement_text).simple_card("Tide Pooler", statement_text)
def urlretrieve(url, filename, reporthook=None, data=None):
'''
This function is adpated from: https://github.com/fchollet/keras
Original work Copyright (c) 2014-2015 keras contributors
'''
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
if not chunk:
break
count += 1
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def download_and_extract_nord_zipfile(ovpn_dir_path):
"""
This function retrieves NordVPN's zipfile, de-serializes & saves it.
Then it extracts the contents.
:param ovpn_dir_path: efault is the users {$HOME}/OVPN/ (%HOME%\OVPN\ on windows)
:return: NUTHIN'
"""
zip_data = urlopen('https://downloads.nordcdn.com/configs/archives/servers/ovpn.zip').read()
zipfile_path = os.path.join(ovpn_dir_path, 'zipfile.zip')
with open(zipfile_path, 'wb+') as nord_zipfile:
nord_zipfile.write(zip_data)
# sanity check
assert os.path.exists(zipfile_path)
# lololol This could be more than one line...buttfuckit
#TODO: also this can b dangerous AF. Need to make some checks before deserializing.
zipfile.ZipFile(zipfile_path).extractall(ovpn_dir_path)
def read_file_or_url(self, fname):
# TODO: not working on localhost
if os.path.isfile(fname):
result = open(fname, 'r')
else:
match = self.urlre.match(fname)
if match:
result = urlopen(match.group(1))
else:
fname = os.path.expanduser(fname)
try:
result = open(os.path.expanduser(fname), 'r')
except IOError:
result = open('%s.%s' % (os.path.expanduser(fname),
self.defaultExtension), 'r')
return result
def _pd_api(self, url, data=None, method='GET'):
url = '%s/%s' % (PD_API_BASE, url)
request_args = {
'headers': dict(self._pd_headers)
}
if six.PY3: # pragma: no cover
request_args['method'] = method
if data is not None:
request_args['data'] = json.dumps(data).encode('utf-8')
request_args['headers']['Content-Type'] = APPLICATION_JSON
request = Request(url, **request_args)
if six.PY2: # pragma: no cover
request.get_method = lambda: method
try:
response = urlopen(request)
return json.loads(response.read().decode('utf-8'))
except HTTPError as e:
response = e.read().decode('utf-8')
logger.warning("API error: %s", response)
if method == 'GET' and e.code == 404:
return None
else:
raise e
def check_connection(default='http://google.com', timeout=1):
"""Test the internet connection.
Parameters
----------
default : str
URL to test; defaults to a Google IP address.
timeout : number
Time in seconds to wait before giving up.
Returns
-------
success : bool
True if appears to be online, else False
"""
success = True
try:
surl = urlparse.quote(default, safe=':./')
urlrequest.urlopen(surl, timeout=timeout)
except urlerror.URLError as derp:
success = False
logger.debug("Network unreachable: {}".format(derp))
return success
def urlretrieve(url, filename, reporthook=None, data=None):
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
count += 1
if not chunk:
reporthook(count, total_size, total_size)
break
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def _read_from_url(url):
try:
response = urlopen(url)
except HTTPError as e:
if e.code == 404:
return "{}"
else:
raise PeeringDBError(
"HTTP error while retrieving info from PeeringDB: "
"code: {}, reason: {} - {}".format(
e.code, e.reason, str(e)
)
)
except Exception as e:
raise PeeringDBError(
"Error while retrieving info from PeeringDB: {}".format(
str(e)
)
)
return response.read().decode("utf-8")
def download(url, filename, overwrite=False, timeout=None):
"""
Download the given URL to the given filename. If the file exists,
it won't be downloaded unless asked to overwrite. Both, text data
like html, txt, etc. or binary data like images, audio, etc. are
acceptable.
:param url: A URL to download.
:param filename: The file to store the downloaded file to.
:param overwrite: Set to True if the file should be downloaded even if it
already exists.
"""
if not os.path.exists(filename) or overwrite:
if timeout is None:
response = urlopen(url)
else:
response = urlopen(url, timeout=timeout)
with open(filename, 'wb') as out_file:
copyfileobj(response, out_file)
def get_sap_symbols(name='sap500'):
"""Get ticker symbols constituting S&P
Args:
name(str): should be 'sap500' or 'sap100'
"""
if name == 'sap500':
site = 'http://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
elif name == 'sap100':
site = 'https://en.wikipedia.org/wiki/S%26P_100'
else:
raise NameError('invalid input: name should be "sap500" or "sap100"')
# fetch data from yahoo finance
page = urlopen(site)
soup = BeautifulSoup(page, 'html.parser')
table = soup.find('table', {'class': 'wikitable sortable'})
symbols = []
for row in table.findAll('tr'):
col = row.findAll('td')
if len(col) > 0:
symbol = col[0].string.replace('.', '-')
symbols.append(str(symbol))
return symbols
def urlretrieve(url, filename, reporthook=None, data=None):
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
if not chunk:
break
count += 1
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def __network_ping(self):
try:
repourl = urljoin(self.get_depot_url(),
"versions/0")
# Disable SSL peer verification, we just want to check
# if the depot is running.
url = urlopen(repourl,
context=ssl._create_unverified_context())
url.close()
except HTTPError as e:
# Server returns NOT_MODIFIED if catalog is up
# to date
if e.code == http_client.NOT_MODIFIED:
return True
else:
return False
except URLError as e:
return False
return True
def getFileIDs(modis_identifier, start_date, end_date, lat, lon, daynightboth):
'''
Retrieve file IDs for images matching search parameters
@param modis_identifier: Product identifier (e.g. MOD09)
@param start_date: Starting date
@param end_date: Ending date
@param lat: Latitude
@param lon: Longitude
@param daynightboth: Get daytime images ('D'), nightime images ('N') or both ('B')
@return list of file IDs
'''
lat_str = str(lat)
lon_str = str(lon)
info_url = ('http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/searchForFiles'
+ '?product=' + modis_identifier + '&collection=6&start=' + start_date
+ '&stop=' + end_date + '&north=' + lat_str + '&south=' + lat_str + '&west='
+ lon_str + '&east=' + lon_str + '&coordsOrTiles=coords&dayNightBoth=' + daynightboth)
url = urlopen(info_url)
tree = ET.fromstring(url.read().decode())
url.close()
return [ int(child.text) for child in tree ]
def download_biological_assemblies(pdb_id, outdir):
"""Downloads biological assembly file from:
`ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/`
Args:
outdir (str): Output directory of the decompressed assembly
"""
# TODO: not tested yet
if not op.exists(outdir):
raise ValueError('{}: output directory does not exist'.format(outdir))
folder = pdb_id[1:3]
server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder)
html_folder = urlopen(server).readlines()
for line in html_folder:
if pdb_id in str(line).strip():
file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0])
outfile_name = file_name.replace('.', '_')
outfile_name = outfile_name.replace('_gz', '.pdb')
f = urlopen(op.join(server, file_name))
decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS)
with open(op.join(outdir, outfile_name), 'wb') as f:
f.write(decompressed_data)
f.close()
log.debug('{}: downloaded biological assembly')
return op.join(outdir, outfile_name)
def open(self, url, **params):
if url.startswith('http') and params:
r = requests.get(url, params=params, stream=True)
r.raw.decode_content = self.decode
response = r.text if self.cache_type else r.raw
else:
try:
r = urlopen(url, context=self.context, timeout=self.timeout)
except TypeError:
r = urlopen(url, timeout=self.timeout)
text = r.read() if self.cache_type else None
if self.decode:
encoding = get_response_encoding(r, self.def_encoding)
if text:
response = decode(text, encoding)
else:
response = reencode(r.fp, encoding, decode=True)
else:
response = text or r
content_type = get_response_content_type(r)
if 'xml' in content_type:
self.ext = 'xml'
elif 'json' in content_type:
self.ext = 'json'
else:
self.ext = content_type.split('/')[1].split(';')[0]
self.r = r
return response
def get_user_details(self, response):
# Obtain JWT and the keys to validate the signature
idToken = response.get('id_token')
jwks = request.urlopen("https://" + self.setting('DOMAIN') + "/.well-known/jwks.json")
issuer = "https://" + self.setting('DOMAIN') + "/"
audience = self.setting('KEY') #CLIENT_ID
payload = jwt.decode(idToken, jwks.read(), algorithms=['RS256'], audience=audience, issuer=issuer)
return {'username': payload['nickname'],
'first_name': payload['name'],
'picture': payload['picture'],
'user_id': payload['sub']}
def fetch(self):
"""Fetch report from PDB website
Yields:
dict: Dictionary with keys same as ['structureId', 'chainID'] + self.fields
"""
response = urlopen(self.url)
return parse_csv_file(response)
def open_image(filename):
if re.match('http[s]?:.*', filename):
image = request.urlopen(filename)
else:
image = open(filename, 'rb')
return image.read()
def load_image(in_image):
""" Load an image, returns PIL.Image. """
# if the path appears to be an URL
if urlparse(in_image).scheme in ('http', 'https',):
# set up the byte stream
img_stream = BytesIO(request.urlopen(in_image).read())
# and read in as PIL image
img = Image.open(img_stream)
else:
# else use it as local file path
img = Image.open(in_image)
return img
def urlretrieve(url, filename, reporthook=None, data=None):
"""Replacement for `urlretrive` for Python 2.
Under Python 2, `urlretrieve` relies on `FancyURLopener` from legacy
`urllib` module, known to have issues with proxy management.
# Arguments
url: url to retrieve.
filename: where to store the retrieved data locally.
reporthook: a hook function that will be called once
on establishment of the network connection and once
after each block read thereafter.
The hook will be passed three arguments;
a count of blocks transferred so far,
a block size in bytes, and the total size of the file.
data: `data` argument passed to `urlopen`.
"""
def chunk_read(response, chunk_size=8192, reporthook=None):
total_size = response.info().get('Content-Length').strip()
total_size = int(total_size)
count = 0
while 1:
chunk = response.read(chunk_size)
count += 1
if not chunk:
reporthook(count, total_size, total_size)
break
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def get_template_contents(template_file=None, template_url=None,
files=None):
# Transform a bare file path to a file:// URL.
if template_file: # nosec
template_url = utils.normalise_file_path_to_url(template_file)
tpl = request.urlopen(template_url).read()
else:
raise exceptions.CommandErrorException(_('Need to specify exactly '
'one of %(arg1)s, %(arg2)s '
'or %(arg3)s') %
{'arg1': '--template-file',
'arg2': '--template-url'})
if not tpl:
raise exceptions.CommandErrorException(_('Could not fetch '
'template from %s') %
template_url)
try:
if isinstance(tpl, six.binary_type):
tpl = tpl.decode('utf-8')
template = template_format.parse(tpl)
except ValueError as e:
raise exceptions.CommandErrorException(_('Error parsing template '
'%(url)s %(error)s') %
{'url': template_url,
'error': e})
return template