python类urlopen()的实例源码

ez_setup.py 文件源码 项目:Adafruit_Python_PCA9685 作者: adafruit 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
output.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def workthread(item, user_agent,path):
    strurl = 'http://yxpjw.club'+item[0]
    picname = item[1]
    print('????%s...........................\n' %(picname))
    req = request.Request(strurl)
    req.add_header('User-Agent',user_agent)
    response = request.urlopen(req)
    content = response.read().decode('gbk')
    strurl2 = re.search(r'^(.*)/',strurl).group(0)
    print('https headers...............%s'%(strurl2))
    #destname = os.path.join(path,picname+'.txt')
    #with open(destname, 'w',encoding='gbk') as file:
        #file.write(content)
    destdir = os.path.join(path,picname)
    os.makedirs(destdir)
    page = 1
    while(1):
        content = getpagedata(content,destdir,page,strurl2)
        if not content:
            break
        page = page + 1
    print('%s?????????\n'%(picname))
ez_setup.py 文件源码 项目:Adafruit_Python_MCP4725 作者: adafruit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
zhaifuliall.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
nanrenzhuang.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
zhaifulione.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
output.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
zhaifuliall.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
zhaifulione.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
ez_setup.py 文件源码 项目:py_find_1st 作者: roebel 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib.request import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
ez_setup.py 文件源码 项目:Adafruit_Python_ADS1x15 作者: adafruit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def download_file_insecure(url, target):
    """
    Use Python to download the file, even though it cannot authenticate the
    connection.
    """
    try:
        from urllib.request import urlopen
    except ImportError:
        from urllib2 import urlopen
    src = dst = None
    try:
        src = urlopen(url)
        # Read/write all in one block, so we don't create a corrupt file
        # if the download is interrupted.
        data = src.read()
        dst = open(target, "wb")
        dst.write(data)
    finally:
        if src:
            src.close()
        if dst:
            dst.close()
tbtools.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
package.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
package.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
package.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def download_archive(self, src, dst):
    for x in self.env.PACKAGE_REPO:
        url = '/'.join((x, src))
        try:
            web = urlopen(url)
            try:
                if web.getcode() != 200:
                    continue
            except AttributeError:
                pass
        except Exception:
            # on python3 urlopen throws an exception
            # python 2.3 does not have getcode and throws an exception to fail
            continue
        else:
            tmp = self.root.make_node(dst)
            tmp.write(web.read())
            Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url))
            break
    else:
        self.fatal('Could not get the package %s' % src)
ChromosomeBot.py 文件源码 项目:scheduled-bots 作者: SuLab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_assembly_report(self, taxid):
        if self.ass_sum is None:
            self.get_assembly_summaries()
        df = self.ass_sum.query("taxid == {} & refseq_category == 'reference genome'".format(taxid))
        if len(df) == 0:
            # try "representative genome" (needed for mouse and rat)
            df = self.ass_sum.query("taxid == {} & refseq_category == 'representative genome'".format(taxid))
        if len(df) != 1:
            raise ValueError("unknown reference: {}".format(df))
        print(df)
        ftp_path = list(df.ftp_path)[0]
        assembly = os.path.split(ftp_path)[1]
        url = os.path.join(ftp_path, assembly + "_assembly_report.txt")
        print(url)
        # read the column names from the file
        table = request.urlopen(request.Request(url)).read().decode()
        names = [x for x in table.split("\n") if x.startswith("#")][-1].strip().replace("# ", "").split("\t")
        self.chr_df[taxid] = pd.read_csv(StringIO(table), sep="\t", names=names, comment='#')
        self.chr_df[taxid] = self.chr_df[taxid].rename(columns={'Sequence-Name': 'SequenceName', 'Sequence-Role': 'SequenceRole',
                                                                'Assigned-Molecule': 'AssignedMolecule',
                                                                'Assigned-Molecule-Location/Type': 'AssignedMoleculeLocationType',
                                                                'GenBank-Accn': 'GenBankAccn', 'RefSeq-Accn': 'RefSeqAccn',
                                                                'UCSC-style-name': 'UCSCstylename'})
        #print(self.chr_df[taxid].query("SequenceRole == 'assembled-molecule'"))
imdb_crawl.py 文件源码 项目:holcrawl 作者: shaypal5 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_business_props(movie_code):
    cur_business_url = _BUSINESS_URL.format(code=movie_code)
    busi_page = bs(request.urlopen(cur_business_url), "html.parser")
    busi_str = str(busi_page)
    weekend_contents = re.findall(_WEEKEND_CONTENT_REGEX, busi_str)[0]
    num_screens_list = [
        int(match.replace(',', ''))
        for match in re.findall(_US_OPEN_WEEKEND_REGEX, weekend_contents)]
    busi_props = {}
    busi_props['screens_by_weekend'] = [
        val for val in reversed(num_screens_list)]
    busi_props['opening_weekend_screens'] = busi_props['screens_by_weekend'][0]
    busi_props['max_screens'] = max(num_screens_list)
    busi_props['total_screens'] = sum(num_screens_list)
    busi_props['avg_screens'] = sum(num_screens_list) / len(num_screens_list)
    busi_props['num_weekends'] = len(num_screens_list)
    return busi_props


# ==== crawling the release page ====
imdb_crawl.py 文件源码 项目:holcrawl 作者: shaypal5 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _get_release_props(movie_code):
    cur_release_url = _RELEASE_URL.format(code=movie_code)
    release_page = bs(urllib.request.urlopen(cur_release_url), "html.parser")
    release_table = release_page.find_all("table", {"id": "release_dates"})[0]
    us_rows = []
    for row in release_table.find_all("tr")[1:]:
        row_str = str(row)
        if 'USA' in row_str:
            us_rows.append(row_str)
    release_props = {}
    release_props['release_day'] = None
    release_props['release_month'] = None
    release_props['release_year'] = None
    for row in us_rows:
        if re.match(_USA_ROW_REGEX, row):
            release = re.findall(_USA_ROW_REGEX, row)[0]
            release_props['release_day'] = int(release[0])
            release_props['release_month'] = release[1]
            release_props['release_year'] = int(release[2])
    return release_props


# ==== crawling the user reviews page ====
imdb_crawl.py 文件源码 项目:holcrawl 作者: shaypal5 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _get_reviews_props(movie_code):
    cur_reviews_url = _REVIEWS_URL.format(code=movie_code)
    reviews_page = bs(urllib.request.urlopen(cur_reviews_url), "html.parser")
    reviews = reviews_page.find_all("td", {"class": "comment-summary"})
    user_reviews = []
    for review in reviews:
        try:
            rating = int(re.findall(_USER_REVIEW_RATING_REGEX, str(review))[0])
            date_str = re.findall(
                r"on (\d{1,2} [a-zA-Z]+ \d{4})", str(review))[0]
            date = datetime.strptime(date_str, "%d %B %Y").date()
            contents = review.find_all(
                'a', href=re.compile(r'reviews.+?'))[0].contents[0]
            user = review.find_all(
                'a', href=re.compile(r'/user/.+?'))[1].contents[0]
            user_reviews.append({
                'score': rating, 'review_date': date,
                'contents': contents, 'user': user
            })
        except Exception:  # pylint: disable=W0703
            pass
    return {'imdb_user_reviews': user_reviews}


# ==== crawling a movie profile ====
speedtest.py 文件源码 项目:SmartSocks 作者: waylybaye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        request = self.request
        try:
            if ((timeit.default_timer() - self.starttime) <= self.timeout and
                    not SHUTDOWN_EVENT.isSet()):
                try:
                    f = urlopen(request)
                except TypeError:
                    # PY24 expects a string or buffer
                    # This also causes issues with Ctrl-C, but we will concede
                    # for the moment that Ctrl-C on PY24 isn't immediate
                    request = build_request(self.request.get_full_url(),
                                            data=request.data.read(self.size))
                    f = urlopen(request)
                f.read(11)
                f.close()
                self.result = sum(self.request.data.total)
            else:
                self.result = 0
        except (IOError, SpeedtestUploadTimeout):
            self.result = sum(self.request.data.total)
tbtools.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def paste(self):
        """Create a paste and return the paste id."""
        data = json.dumps({
            'description': 'Werkzeug Internal Server Error',
            'public': False,
            'files': {
                'traceback.txt': {
                    'content': self.plaintext
                }
            }
        }).encode('utf-8')
        try:
            from urllib2 import urlopen
        except ImportError:
            from urllib.request import urlopen
        rv = urlopen('https://api.github.com/gists', data=data)
        resp = json.loads(rv.read().decode('utf-8'))
        rv.close()
        return {
            'url': resp['html_url'],
            'id': resp['id']
        }
wayback.py 文件源码 项目:waybackscraper 作者: abrenaut 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def list_archive_timestamps(url, min_date, max_date, user_agent):
    """
    List the available archive between min_date and max_date for the given URL
    """
    logger.info('Listing the archives for the url {url}'.format(url=url))

    # Construct the URL used to download the memento list
    parameters = {'url': url,
                  'output': 'json',
                  'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
                  'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
    cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))

    req = Request(cdx_url, None, {'User-Agent': user_agent})
    with urlopen(req) as cdx:
        memento_json = cdx.read().decode("utf-8")

        timestamps = []
        # Ignore the first line which contains column names
        for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
            # Ignore archives with a status code != OK
            if status_code == '200':
                timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))

    return timestamps
fetch.py 文件源码 项目:CMSpider 作者: chengyu2333 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch_file(self, url, filename):
        # if not os.path.exists(filename):
        #     os.makedirs(filename)
        try:
            req = request.Request(url, headers=self.__headers)
            data = request.urlopen(req).read()
            with open(filename, 'wb') as f:
                f.write(data)
                f.flush()
                f.close()
                self.__url_manager.set_url_status(url, 2)
        except Exception as e:
            self.__url_manager.set_url_status(url, -1)
            raise e
        finally:
            time.sleep(config['basic']['sleep'])
general.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retrieve_json(self,url):
        '''
        Retrieve data from the Veneer service at the given url path.

        url: Path to required resource, relative to the root of the Veneer service.
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        if self.protocol=='file':
            text = open(self.prefix+url+self.data_ext).read()
        else:
            conn = hc.HTTPConnection(self.host,port=self.port)
            conn.request('GET',quote(url+self.data_ext))
            resp = conn.getresponse()
            text = resp.read().decode('utf-8')
            #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')

        text = self._replace_inf(text)
        if PRINT_ALL:
            print(json.loads(text))
            print("")
        return json.loads(text)
general.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def retrieve_csv(self,url):
        '''
        Retrieve data from the Veneer service, at the given url path, in CSV format.

        url: Path to required resource, relative to the root of the Veneer service.

        NOTE: CSV responses are currently only available for time series results
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
        text = urlopen(req).read().decode('utf-8')

        result = utils.read_veneer_csv(text)
        if PRINT_ALL:
            print(result)
            print("")
        return result
bulk.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def retrieve_json(self,url,**kwargs):
        if self.print_urls:
            print("*** %s ***" % (url))

        try:
            text = urlopen(self.base_url + quote(url)).read().decode('utf-8')
        except:
            self.log("Couldn't retrieve %s"%url)
            return None

        self.save_data(url[1:],bytes(text,'utf-8'),"json")

        if self.print_all:
            print(json.loads(text))
            print("")
        return json.loads(text)


问题


面经


文章

微信
公众号

扫码关注公众号