python类Request()的实例源码

html_downloader.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
        if url is None:
            return None
        try:
            req = request.Request(url, headers=headers, data=data)
            cookie = cookiejar.CookieJar()
            cookie_process = request.HTTPCookieProcessor(cookie)
            opener = request.build_opener()
            if proxy:
                proxies = {urlparse(url).scheme: proxy}
                opener.add_handler(request.ProxyHandler(proxies))
            content = opener.open(req).read()
        except error.URLError as e:
            print('HtmlDownLoader download error:', e.reason)
            content = None
            if retry_count > 0:
                if hasattr(e, 'code') and 500 <= e.code < 600:
                    #??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
                    return self.download(url, retry_count-1, headers, proxy, data)
        return content
macro.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_cpi():
    """
        ????????????
    Return
    --------
    DataFrame
        month :????
        cpi :????
    """
    rdint = vs.random()
    request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
                                    rdint, vs.MACRO_TYPE[1], 0, 600,
                                    rdint))
    text = urlopen(request,timeout=10).read()
    text = text.decode('gbk') if ct.PY3 else text
    regSym = re.compile(r'\,count:(.*?)\}')
    datastr = regSym.findall(text)
    datastr = datastr[0]
    datastr = datastr.split('data:')[1]
    js = json.loads(datastr)
    df = pd.DataFrame(js, columns=vs.CPI_COLS)
    df['cpi'] = df['cpi'].astype(float)
    return df
output.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def workthread(item, user_agent,path):
    strurl = 'http://yxpjw.club'+item[0]
    picname = item[1]
    print('????%s...........................\n' %(picname))
    req = request.Request(strurl)
    req.add_header('User-Agent',user_agent)
    response = request.urlopen(req)
    content = response.read().decode('gbk')
    strurl2 = re.search(r'^(.*)/',strurl).group(0)
    print('https headers...............%s'%(strurl2))
    #destname = os.path.join(path,picname+'.txt')
    #with open(destname, 'w',encoding='gbk') as file:
        #file.write(content)
    destdir = os.path.join(path,picname)
    os.makedirs(destdir)
    page = 1
    while(1):
        content = getpagedata(content,destdir,page,strurl2)
        if not content:
            break
        page = page + 1
    print('%s?????????\n'%(picname))
macro.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_loan_rate():
    """
        ????????
    Return
    --------
    DataFrame
        date :????
        loan_type :????
        rate:???%?
    """
    rdint = vs.random()
    request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
                                    rdint, vs.MACRO_TYPE[2], 3, 800,
                                    rdint))
    text = urlopen(request, timeout=10).read()
    text = text.decode('gbk')
    regSym = re.compile(r'\,count:(.*?)\}')
    datastr = regSym.findall(text)
    datastr = datastr[0]
    datastr = datastr.split('data:')[1]
    js = json.loads(datastr)
    df = pd.DataFrame(js, columns=vs.LOAN_COLS)
    for i in df.columns:
        df[i] = df[i].apply(lambda x:np.where(x is None, '--', x))
    return df
zhaifuliall.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
nanrenzhuang.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
zhaifulione.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
output.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
nanrenzhuang.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
zhaifuliall.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
zhaifulione.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
output1.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_download_and_verify_ok(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'expect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.glance._download_tarball_and_verify(
            fake_request, 'fake_staging_path')
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_info.getheader.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_download_ok_verify_failed(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'unexpect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.assertRaises(self.glance.RetryableError,
                          self.glance._download_tarball_and_verify,
                          fake_request, 'fake_staging_path'
                          )
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
wayback.py 文件源码 项目:waybackscraper 作者: abrenaut 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def list_archive_timestamps(url, min_date, max_date, user_agent):
    """
    List the available archive between min_date and max_date for the given URL
    """
    logger.info('Listing the archives for the url {url}'.format(url=url))

    # Construct the URL used to download the memento list
    parameters = {'url': url,
                  'output': 'json',
                  'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
                  'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
    cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))

    req = Request(cdx_url, None, {'User-Agent': user_agent})
    with urlopen(req) as cdx:
        memento_json = cdx.read().decode("utf-8")

        timestamps = []
        # Ignore the first line which contains column names
        for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
            # Ignore archives with a status code != OK
            if status_code == '200':
                timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))

    return timestamps
fetch.py 文件源码 项目:CMSpider 作者: chengyu2333 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fetch_file(self, url, filename):
        # if not os.path.exists(filename):
        #     os.makedirs(filename)
        try:
            req = request.Request(url, headers=self.__headers)
            data = request.urlopen(req).read()
            with open(filename, 'wb') as f:
                f.write(data)
                f.flush()
                f.close()
                self.__url_manager.set_url_status(url, 2)
        except Exception as e:
            self.__url_manager.set_url_status(url, -1)
            raise e
        finally:
            time.sleep(config['basic']['sleep'])
general.py 文件源码 项目:veneer-py 作者: flowmatters 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def retrieve_csv(self,url):
        '''
        Retrieve data from the Veneer service, at the given url path, in CSV format.

        url: Path to required resource, relative to the root of the Veneer service.

        NOTE: CSV responses are currently only available for time series results
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
        text = urlopen(req).read().decode('utf-8')

        result = utils.read_veneer_csv(text)
        if PRINT_ALL:
            print(result)
            print("")
        return result
reachability-monitor.py 文件源码 项目:securedrop-reachability-monitor 作者: freedomofpress 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_directory(self, directory_url):
        """Parses the SecureDrop directory into a dictionary of instance
        details."""
        # CloudFlare will block us if we don't set user-agent
        dir_req = Request(directory_url)
        dir_req.add_header("User-Agent", 
                           "Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
                           "Gecko/20100101 Firefox/45.0")
        directory = urlopen(dir_req).read().decode()

        instances = []
        for line in directory.splitlines()[1:-1]:
            fields = line.split("\t")
            instances.append(dict(organization=fields[0],
                                  landing_page=fields[1],
                                  ths_address=fields[2]))

        return instances
checkpoint.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
threads.py 文件源码 项目:tvlinker 作者: ozmartian 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def add_uri(self) -> None:
        user, passwd = '', ''
        if len(self.rpc_username) > 0 and len(self.rpc_password) > 0:
            user = self.rpc_username
            passwd = self.rpc_password
        elif len(self.rpc_secret) > 0:
            user = 'token'
            passwd = self.rpc_secret
        aria2_endpoint = '%s:%s/jsonrpc' % (self.rpc_host, self.rpc_port)
        headers = {'Content-Type': 'application/json'}
        payload = json.dumps({'jsonrpc': '2.0', 'id': 1, 'method': 'aria2.addUri',
                              'params': ['%s:%s' % (user, passwd), [self.link_url]]},
                             sort_keys=False).encode('utf-8')
        try:
            req = Request(aria2_endpoint, headers=headers, data=payload)
            res = urlopen(req).read().decode('utf-8')
            jsonres = json.loads(res)
            # res = requests.post(aria2_endpoint, headers=headers, data=payload)
            # jsonres = res.json()
            self.aria2Confirmation.emit('result' in jsonres.keys())
        except HTTPError:
            print(sys.exc_info())
            QMessageBox.critical(self, 'ERROR NOTIFICATION', sys.exc_info(), QMessageBox.Ok)
            self.aria2Confirmation.emit(False)
            # self.exit()
web.py 文件源码 项目:ask_data_science 作者: AngelaVC 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, url=None):
        self.url = url
        self.html = None
        self.links = []
        self.soup = None
        self.text = None
        self.title = None

        req = Request(self.url, headers={'User-Agent': "Magic Browser"})
        try:
            self.html = urlopen(req)
        except URLError as e:
            if hasattr(e, 'reason'):
                print('We failed to reach a server.')
                print('Reason: ', e.reason)

            elif hasattr(e, 'code'):
                print('The server couldn\'t fulfill the request.')
                print('Error code: ', e.code)
boxoffice.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _day_cinema(date=None, pNo=1, retry_count=3, pause=0.001):
    ct._write_console()
    for _ in range(retry_count):
        time.sleep(pause)
        try:
            request = Request(ct.BOXOFFICE_CBD%(ct.P_TYPE['http'], ct.DOMAINS['mbox'],
                              ct.BOX, pNo, date))
            lines = urlopen(request, timeout = 10).read()
            if len(lines) < 15: #no data
                return None
        except Exception as e:
            print(e)
        else:
            js = json.loads(lines.decode('utf-8') if ct.PY3 else lines)
            df = pd.DataFrame(js['data1'])
            df = df.drop(['CinemaID'], axis=1)
            return df
classifying.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_detail(tag, retry_count=3, pause=0.001):
    for _ in range(retry_count):
        time.sleep(pause)
        try:
            ct._write_console()
            request = Request(ct.SINA_DATA_DETAIL_URL%(ct.P_TYPE['http'],
                                                               ct.DOMAINS['vsf'], ct.PAGES['jv'],
                                                               tag))
            text = urlopen(request, timeout=10).read()
            text = text.decode('gbk')
        except _network_error_classes:
            pass
        else:
            reg = re.compile(r'\,(.*?)\:') 
            text = reg.sub(r',"\1":', text) 
            text = text.replace('"{symbol', '{"symbol')
            text = text.replace('{symbol', '{"symbol"')
            jstr = json.dumps(text)
            js = json.loads(jstr)
            df = pd.DataFrame(pd.read_json(js, dtype={'code':object}), columns=ct.THE_FIELDS)
            df = df[ct.FOR_CLASSIFY_B_COLS]
            return df
        raise IOError(ct.NETWORK_URL_ERROR_MSG)
reference.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _sz_hz(date='', retry_count=3, pause=0.001):
    for _ in range(retry_count):
        time.sleep(pause)
        ct._write_console()
        try:
            request = Request(rv.MAR_SZ_HZ_URL%(ct.P_TYPE['http'], ct.DOMAINS['szse'],
                                    ct.PAGES['szsefc'], date))
            lines = urlopen(request, timeout = 10).read()
            if len(lines) <= 200:
                return pd.DataFrame()
            df = pd.read_html(lines, skiprows=[0])[0]
            df.columns = rv.MAR_SZ_HZ_COLS
            df['opDate'] = date
        except Exception as e:
            print(e)
        else:
            return df
    raise IOError(ct.NETWORK_URL_ERROR_MSG)
trading.py 文件源码 项目:TuShare 作者: andyzsf 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _parase_fq_factor(code, start, end):
    symbol = _code_to_symbol(code)
    request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'],
                                             ct.DOMAINS['vsf'], symbol))
    text = urlopen(request, timeout=10).read()
    text = text[1:len(text)-1]
    text = text.decode('utf-8') if ct.PY3 else text
    text = text.replace('{_', '{"')
    text = text.replace('total', '"total"')
    text = text.replace('data', '"data"')
    text = text.replace(':"', '":"')
    text = text.replace('",_', '","')
    text = text.replace('_', '-')
    text = json.loads(text)
    df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())})
    df['date'] = df['date'].map(_fun_except) # for null case
    if df['date'].dtypes == np.object:
        df['date'] = df['date'].astype(np.datetime64)
    df = df.drop_duplicates('date')
    df['factor'] = df['factor'].astype(float)
    return df


问题


面经


文章

微信
公众号

扫码关注公众号