python类urlopen()的实例源码

modis_util.py 文件源码 项目:scikit-dataaccess 作者: MITHaystack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getFileURLs(file_ids):
    '''
    Retrieve the ftp location for a list of file IDs

    @param file_ids: List of file IDs
    @return List of ftp locations
    '''



    info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds='

    for file_id in file_ids:
        info_url += str(file_id) + ','

    info_url = info_url[:-1]


    url = urlopen(info_url)
    tree = ET.fromstring(url.read().decode())
    url.close()

    return [ child.text for child in tree ]
pdb.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def download_sifts_xml(pdb_id, outdir='', outfile=''):
    """Download the SIFTS file for a PDB ID.

    Args:
        pdb_id:
        outdir:
        outfile:

    Returns:

    """
    baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/'
    filename = '{}.xml.gz'.format(pdb_id)

    if outfile:
        outfile = op.join(outdir, outfile)
    else:
        outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml')

    if not op.exists(outfile):
        response = urlopen(baseURL + filename)
        with open(outfile, 'wb') as f:
            f.write(gzip.decompress(response.read()))

    return outfile
heat.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_template_files(template_data=None, template_url=None):
    if template_data:
        tpl = template_data
    elif template_url:
        with contextlib.closing(request.urlopen(template_url)) as u:
            tpl = u.read()
    else:
        return {}, None
    if not tpl:
        return {}, None
    if isinstance(tpl, six.binary_type):
        tpl = tpl.decode('utf-8')
    template = template_format.parse(tpl)
    files = {}
    _get_file_contents(template, files)
    return files, template
data_utils.py 文件源码 项目:keras 作者: GeekLiB 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
        def chunk_read(response, chunk_size=8192, reporthook=None):
            total_size = response.info().get('Content-Length').strip()
            total_size = int(total_size)
            count = 0
            while 1:
                chunk = response.read(chunk_size)
                count += 1
                if not chunk:
                    reporthook(count, total_size, total_size)
                    break
                if reporthook:
                    reporthook(count, chunk_size, total_size)
                yield chunk

        response = urlopen(url, data)
        with open(filename, 'wb') as fd:
            for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
utils.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
    """
    try:
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
        else:
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
                'file:',
                request.pathname2url(path)
            )
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
test_tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_ClientRedirectServer(self):
        # create a ClientRedirectServer and run it in a thread to listen
        # for a mock GET request with the access token
        # the server should return a 200 message and store the token
        httpd = tools.ClientRedirectServer(('localhost', 0),
                                           tools.ClientRedirectHandler)
        code = 'foo'
        url = 'http://localhost:{0}?code={1}'.format(
            httpd.server_address[1], code)
        t = threading.Thread(target=httpd.handle_request)
        t.setDaemon(True)
        t.start()
        f = request.urlopen(url)
        self.assertTrue(f.read())
        t.join()
        httpd.server_close()
        self.assertEqual(httpd.query_params.get('code'), code)
test_tools.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_ClientRedirectServer(self):
        # create a ClientRedirectServer and run it in a thread to listen
        # for a mock GET request with the access token
        # the server should return a 200 message and store the token
        httpd = tools.ClientRedirectServer(('localhost', 0),
                                           tools.ClientRedirectHandler)
        code = 'foo'
        url = 'http://localhost:{0}?code={1}'.format(
            httpd.server_address[1], code)
        t = threading.Thread(target=httpd.handle_request)
        t.setDaemon(True)
        t.start()
        f = request.urlopen(url)
        self.assertTrue(f.read())
        t.join()
        httpd.server_close()
        self.assertEqual(httpd.query_params.get('code'), code)
tf-keras-skeleton.py 文件源码 项目:LIE 作者: EmbraceLife 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def import_libs():
            from __future__ import absolute_import
            from __future__ import division
            from __future__ import print_function

            import hashlib
            import os
            import shutil
            import sys
            import tarfile
            import zipfile

            import six
            from six.moves.urllib.error import HTTPError
            from six.moves.urllib.error import URLError
            from six.moves.urllib.request import urlopen

            from tensorflow.contrib.keras.python.keras.utils.generic_utils import Progbar
data_utils.py 文件源码 项目:keras-customized 作者: ambrite 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
        def chunk_read(response, chunk_size=8192, reporthook=None):
            total_size = response.info().get('Content-Length').strip()
            total_size = int(total_size)
            count = 0
            while 1:
                chunk = response.read(chunk_size)
                count += 1
                if not chunk:
                    reporthook(count, total_size, total_size)
                    break
                if reporthook:
                    reporthook(count, chunk_size, total_size)
                yield chunk

        response = urlopen(url, data)
        with open(filename, 'wb') as fd:
            for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
tidepooler.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _make_tide_request(city, date):
    station = STATIONS.get(city.lower())
    noaa_api_params = {
        'station': station,
        'product': 'predictions',
        'datum': 'MLLW',
        'units': 'english',
        'time_zone': 'lst_ldt',
        'format': 'json'
    }
    if date == datetime.date.today():
        noaa_api_params['date'] = 'today'
    else:
        noaa_api_params['begin_date'] = date.strftime('%Y%m%d')
        noaa_api_params['range'] = 24
    url = ENDPOINT + "?" + urlencode(noaa_api_params)
    resp_body = urlopen(url).read()
    if len(resp_body) == 0:
        statement_text = render_template('noaa_problem')
    else:
        noaa_response_obj = json.loads(resp_body)
        predictions = noaa_response_obj['predictions']
        tideinfo = _find_tide_info(predictions)
        statement_text = render_template('tide_info', date=date, city=city, tideinfo=tideinfo)
    return statement(statement_text).simple_card("Tide Pooler", statement_text)
__init__.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
    '''
    This function is adpated from: https://github.com/fchollet/keras
    Original work Copyright (c) 2014-2015 keras contributors
    '''
    def chunk_read(response, chunk_size=8192, reporthook=None):
      total_size = response.info().get('Content-Length').strip()
      total_size = int(total_size)
      count = 0
      while 1:
        chunk = response.read(chunk_size)
        if not chunk:
          break
        count += 1
        if reporthook:
          reporthook(count, chunk_size, total_size)
        yield chunk

    response = urlopen(url, data)
    with open(filename, 'wb') as fd:
      for chunk in chunk_read(response, reporthook=reporthook):
        fd.write(chunk)
update.py 文件源码 项目:ovpn-speed-connect 作者: N2ITN 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_and_extract_nord_zipfile(ovpn_dir_path):
    """
    This function retrieves NordVPN's zipfile, de-serializes & saves it.
    Then it extracts the contents.

    :param ovpn_dir_path: efault is the users {$HOME}/OVPN/ (%HOME%\OVPN\ on windows)

    :return: NUTHIN'
    """
    zip_data = urlopen('https://downloads.nordcdn.com/configs/archives/servers/ovpn.zip').read()
    zipfile_path = os.path.join(ovpn_dir_path, 'zipfile.zip')
    with open(zipfile_path, 'wb+') as nord_zipfile:
        nord_zipfile.write(zip_data)
    # sanity check
    assert os.path.exists(zipfile_path)
    # lololol This could be more than one line...buttfuckit
    #TODO: also this can b dangerous AF. Need to make some checks before deserializing.
    zipfile.ZipFile(zipfile_path).extractall(ovpn_dir_path)
cmd2.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read_file_or_url(self, fname):
        # TODO: not working on localhost
        if os.path.isfile(fname):
            result = open(fname, 'r')
        else:
            match = self.urlre.match(fname)
            if match:
                result = urlopen(match.group(1))
            else:
                fname = os.path.expanduser(fname)
                try:
                    result = open(os.path.expanduser(fname), 'r')
                except IOError:
                    result = open('%s.%s' % (os.path.expanduser(fname),
                                             self.defaultExtension), 'r')
        return result
pagerduty.py 文件源码 项目:spacel-provision 作者: pebble 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _pd_api(self, url, data=None, method='GET'):
        url = '%s/%s' % (PD_API_BASE, url)
        request_args = {
            'headers': dict(self._pd_headers)
        }
        if six.PY3:  # pragma: no cover
            request_args['method'] = method

        if data is not None:
            request_args['data'] = json.dumps(data).encode('utf-8')
            request_args['headers']['Content-Type'] = APPLICATION_JSON

        request = Request(url, **request_args)
        if six.PY2:  # pragma: no cover
            request.get_method = lambda: method

        try:
            response = urlopen(request)
            return json.loads(response.read().decode('utf-8'))
        except HTTPError as e:
            response = e.read().decode('utf-8')
            logger.warning("API error: %s", response)
            if method == 'GET' and e.code == 404:
                return None
            else:
                raise e
utils.py 文件源码 项目:open-mic 作者: cosmir 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_connection(default='http://google.com', timeout=1):
    """Test the internet connection.

    Parameters
    ----------
    default : str
        URL to test; defaults to a Google IP address.

    timeout : number
        Time in seconds to wait before giving up.

    Returns
    -------
    success : bool
        True if appears to be online, else False
    """
    success = True
    try:
        surl = urlparse.quote(default, safe=':./')
        urlrequest.urlopen(surl, timeout=timeout)
    except urlerror.URLError as derp:
        success = False
        logger.debug("Network unreachable: {}".format(derp))
    return success
data_utils.py 文件源码 项目:Benchmarks 作者: ECP-CANDLE 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
        def chunk_read(response, chunk_size=8192, reporthook=None):
            total_size = response.info().get('Content-Length').strip()
            total_size = int(total_size)
            count = 0
            while 1:
                chunk = response.read(chunk_size)
                count += 1
                if not chunk:
                    reporthook(count, total_size, total_size)
                    break
                if reporthook:
                    reporthook(count, chunk_size, total_size)
                yield chunk

        response = urlopen(url, data)
        with open(filename, 'wb') as fd:
            for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
peering_db.py 文件源码 项目:arouteserver 作者: pierky 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _read_from_url(url):
        try:
            response = urlopen(url)
        except HTTPError as e:
            if e.code == 404:
                return "{}"
            else:
                raise PeeringDBError(
                    "HTTP error while retrieving info from PeeringDB: "
                    "code: {}, reason: {} - {}".format(
                        e.code, e.reason, str(e)
                    )
                )
        except Exception as e:
            raise PeeringDBError(
                "Error while retrieving info from PeeringDB: {}".format(
                    str(e)
                )
            )

        return response.read().decode("utf-8")
utilities.py 文件源码 项目:file-metadata 作者: pywikibot-catfiles 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download(url, filename, overwrite=False, timeout=None):
    """
    Download the given URL to the given filename. If the file exists,
    it won't be downloaded unless asked to overwrite. Both, text data
    like html, txt, etc. or binary data like images, audio, etc. are
    acceptable.

    :param url:       A URL to download.
    :param filename:  The file to store the downloaded file to.
    :param overwrite: Set to True if the file should be downloaded even if it
                      already exists.
    """
    if not os.path.exists(filename) or overwrite:
        if timeout is None:
            response = urlopen(url)
        else:
            response = urlopen(url, timeout=timeout)
        with open(filename, 'wb') as out_file:
            copyfileobj(response, out_file)
utils.py 文件源码 项目:DQN 作者: jjakimoto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_sap_symbols(name='sap500'):
    """Get ticker symbols constituting S&P

    Args:
        name(str): should be 'sap500' or 'sap100'
    """
    if name == 'sap500':
        site = 'http://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    elif name == 'sap100':
        site = 'https://en.wikipedia.org/wiki/S%26P_100'
    else:
        raise NameError('invalid input: name should be "sap500" or "sap100"')
    # fetch data from yahoo finance
    page = urlopen(site)
    soup = BeautifulSoup(page, 'html.parser')
    table = soup.find('table', {'class': 'wikitable sortable'})
    symbols = []
    for row in table.findAll('tr'):
        col = row.findAll('td')
        if len(col) > 0:
            symbol = col[0].string.replace('.', '-')
            symbols.append(str(symbol))
    return symbols
data_utils.py 文件源码 项目:InnerOuterRNN 作者: Chemoinformatics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
        def chunk_read(response, chunk_size=8192, reporthook=None):
            total_size = response.info().get('Content-Length').strip()
            total_size = int(total_size)
            count = 0
            while 1:
                chunk = response.read(chunk_size)
                if not chunk:
                    break
                count += 1
                if reporthook:
                    reporthook(count, chunk_size, total_size)
                yield chunk

        response = urlopen(url, data)
        with open(filename, 'wb') as fd:
            for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
depotcontroller.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __network_ping(self):
                try:
                        repourl = urljoin(self.get_depot_url(),
                            "versions/0")
                        # Disable SSL peer verification, we just want to check
                        # if the depot is running.
                        url = urlopen(repourl,
                            context=ssl._create_unverified_context())
                        url.close()
                except HTTPError as e:
                        # Server returns NOT_MODIFIED if catalog is up
                        # to date
                        if e.code == http_client.NOT_MODIFIED:
                                return True
                        else:
                                return False
                except URLError as e:
                        return False
                return True
modis_util.py 文件源码 项目:scikit-dataaccess 作者: MITHaystack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getFileIDs(modis_identifier, start_date, end_date, lat, lon, daynightboth):
    '''
    Retrieve file IDs for images matching search parameters

    @param modis_identifier: Product identifier (e.g. MOD09)
    @param start_date: Starting date
    @param end_date: Ending date
    @param lat: Latitude 
    @param lon: Longitude
    @param daynightboth: Get daytime images ('D'), nightime images ('N') or both ('B')

    @return list of file IDs
    '''

    lat_str = str(lat)
    lon_str = str(lon)

    info_url = ('http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/searchForFiles'
                    + '?product=' + modis_identifier + '&collection=6&start=' + start_date
                    + '&stop=' + end_date + '&north=' + lat_str + '&south=' + lat_str + '&west='
                    + lon_str + '&east=' + lon_str + '&coordsOrTiles=coords&dayNightBoth=' + daynightboth)

    url = urlopen(info_url)
    tree = ET.fromstring(url.read().decode())
    url.close()

    return [ int(child.text) for child in tree ]
pdb.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_biological_assemblies(pdb_id, outdir):
    """Downloads biological assembly file from:
    `ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/`

    Args:
        outdir (str): Output directory of the decompressed assembly

    """

    # TODO: not tested yet
    if not op.exists(outdir):
        raise ValueError('{}: output directory does not exist'.format(outdir))

    folder = pdb_id[1:3]
    server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder)
    html_folder = urlopen(server).readlines()
    for line in html_folder:
        if pdb_id in str(line).strip():
            file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0])
            outfile_name = file_name.replace('.', '_')
            outfile_name = outfile_name.replace('_gz', '.pdb')
            f = urlopen(op.join(server, file_name))
            decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS)
            with open(op.join(outdir, outfile_name), 'wb') as f:
                f.write(decompressed_data)
                f.close()
            log.debug('{}: downloaded biological assembly')
            return op.join(outdir, outfile_name)
utils.py 文件源码 项目:riko 作者: nerevu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def open(self, url, **params):
        if url.startswith('http') and params:
            r = requests.get(url, params=params, stream=True)
            r.raw.decode_content = self.decode
            response = r.text if self.cache_type else r.raw
        else:
            try:
                r = urlopen(url, context=self.context, timeout=self.timeout)
            except TypeError:
                r = urlopen(url, timeout=self.timeout)

            text = r.read() if self.cache_type else None

            if self.decode:
                encoding = get_response_encoding(r, self.def_encoding)

                if text:
                    response = decode(text, encoding)
                else:
                    response = reencode(r.fp, encoding, decode=True)
            else:
                response = text or r

        content_type = get_response_content_type(r)

        if 'xml' in content_type:
            self.ext = 'xml'
        elif 'json' in content_type:
            self.ext = 'json'
        else:
            self.ext = content_type.split('/')[1].split(';')[0]

        self.r = r
        return response
auth0backend.py 文件源码 项目:auth0-django-web-app 作者: auth0-samples 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_user_details(self, response):
        # Obtain JWT and the keys to validate the signature
        idToken = response.get('id_token')
        jwks = request.urlopen("https://" + self.setting('DOMAIN') + "/.well-known/jwks.json")
        issuer = "https://" + self.setting('DOMAIN') + "/"
        audience = self.setting('KEY') #CLIENT_ID
        payload = jwt.decode(idToken, jwks.read(), algorithms=['RS256'], audience=audience, issuer=issuer)

        return {'username': payload['nickname'],
                'first_name': payload['name'],
                'picture': payload['picture'],
                'user_id': payload['sub']}
pdb.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fetch(self):
        """Fetch report from PDB website

        Yields:
            dict: Dictionary with keys same as ['structureId', 'chainID'] + self.fields

        """
        response = urlopen(self.url)
        return parse_csv_file(response)
photo.py 文件源码 项目:fanfou-py 作者: akgnah 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def open_image(filename):
    if re.match('http[s]?:.*', filename):
        image = request.urlopen(filename)
    else:
        image = open(filename, 'rb')
    return image.read()
data_utils.py 文件源码 项目:tflearn 作者: tflearn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_image(in_image):
    """ Load an image, returns PIL.Image. """
    # if the path appears to be an URL
    if urlparse(in_image).scheme in ('http', 'https',):
        # set up the byte stream
        img_stream = BytesIO(request.urlopen(in_image).read())
        # and read in as PIL image
        img = Image.open(img_stream)
    else:
        # else use it as local file path
        img = Image.open(in_image)
    return img
data_utils.py 文件源码 项目:deep-learning-keras-projects 作者: jasmeetsb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
        """Replacement for `urlretrive` for Python 2.

        Under Python 2, `urlretrieve` relies on `FancyURLopener` from legacy
        `urllib` module, known to have issues with proxy management.

        # Arguments
            url: url to retrieve.
            filename: where to store the retrieved data locally.
            reporthook: a hook function that will be called once
                on establishment of the network connection and once
                after each block read thereafter.
                The hook will be passed three arguments;
                a count of blocks transferred so far,
                a block size in bytes, and the total size of the file.
            data: `data` argument passed to `urlopen`.
        """
        def chunk_read(response, chunk_size=8192, reporthook=None):
            total_size = response.info().get('Content-Length').strip()
            total_size = int(total_size)
            count = 0
            while 1:
                chunk = response.read(chunk_size)
                count += 1
                if not chunk:
                    reporthook(count, total_size, total_size)
                    break
                if reporthook:
                    reporthook(count, chunk_size, total_size)
                yield chunk

        response = urlopen(url, data)
        with open(filename, 'wb') as fd:
            for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
template_utils.py 文件源码 项目:python-zunclient 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_template_contents(template_file=None, template_url=None,
                          files=None):

    # Transform a bare file path to a file:// URL.
    if template_file:  # nosec
        template_url = utils.normalise_file_path_to_url(template_file)
        tpl = request.urlopen(template_url).read()
    else:
        raise exceptions.CommandErrorException(_('Need to specify exactly '
                                                 'one of %(arg1)s, %(arg2)s '
                                                 'or %(arg3)s') %
                                               {'arg1': '--template-file',
                                                'arg2': '--template-url'})

    if not tpl:
        raise exceptions.CommandErrorException(_('Could not fetch '
                                                 'template from %s') %
                                               template_url)

    try:
        if isinstance(tpl, six.binary_type):
            tpl = tpl.decode('utf-8')
        template = template_format.parse(tpl)
    except ValueError as e:
        raise exceptions.CommandErrorException(_('Error parsing template '
                                                 '%(url)s %(error)s') %
                                               {'url': template_url,
                                                'error': e})
    return template


问题


面经


文章

微信
公众号

扫码关注公众号