def get_file_urls(mainUrl,extension):
uniFileUrls = []
if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
mainUrl = 'http://%s'%mainUrl
print('Downloading from %s...'%mainUrl)
if extension.startswith('*'):
extension = extension[1:]
if not extension.startswith('.'):
extension = '.' + extension
req = urllib.request.Request(
mainUrl,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
urlContent = urllib.request.urlopen(req).read().decode('utf-8')
html = lxml.html.fromstring(urlContent)
urls = html.xpath('//a/@href')
for url in urls:
if url.endswith(extension):
url = urljoin(mainUrl,url)
if url not in uniFileUrls:
uniFileUrls.append(url)
return uniFileUrls
python类xpath()的实例源码
def get_file_urls(mainUrl,extension):
uniFileUrls = []
if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
mainUrl = 'http://%s'%mainUrl
print('Downloading from %s...'%mainUrl)
if extension.startswith('*'):
extension = extension[1:]
if not extension.startswith('.'):
extension = '.' + extension
req = urllib.request.Request(
mainUrl,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
urlContent = urllib.request.urlopen(req).read().decode('utf-8')
html = lxml.html.fromstring(urlContent)
urls = html.xpath('//a/@href')
for url in urls:
if url.endswith(extension):
url = urljoin(mainUrl,url)
if url not in uniFileUrls:
uniFileUrls.append(url)
return uniFileUrls
def latest_content(url):
'''
??????????
Parameter
--------
url:????
Return
--------
string:?????????
'''
try:
html = lxml.html.parse(url)
res = html.xpath('//div[@id=\"artibody\"]/p')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n').
html_content = lxml.html.fromstring(sarr)
content = html_content.text_content()
return content
except Exception as er:
print(str(er))
def _guba_content(url):
try:
html = lxml.html.parse(url)
res = html.xpath('//div[@class=\"ilt_p\"]/p')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n').
html_content = lxml.html.fromstring(sarr)
content = html_content.text_content()
ptime = html.xpath('//div[@class=\"fl_left iltp_time\"]/span/text()')[0]
rcounts = html.xpath('//div[@class=\"fl_right iltp_span\"]/span[2]/text()')[0]
reg = re.compile(r'\((.*?)\)')
rcounts = reg.findall(rcounts)[0]
return [content, ptime, rcounts]
except Exception:
return ['', '', '0']
def _today_ticks(symbol, tdate, pageNo, retry_count, pause):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
html = lxml.html.parse(ct.TODAY_TICKS_URL % (ct.P_TYPE['http'],
ct.DOMAINS['vsf'], ct.PAGES['t_ticks'],
symbol, tdate, pageNo
))
res = html.xpath('//table[@id=\"datatbl\"]/tbody/tr')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
sarr = sarr.replace('--', '0')
df = pd.read_html(StringIO(sarr), parse_dates=False)[0]
df.columns = ct.TODAY_TICK_COLUMNS
df['pchange'] = df['pchange'].map(lambda x : x.replace('%', ''))
except Exception as e:
print(e)
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def get_releasenote(html_source):
html = lxml.html.fromstring(html_source)
versions_dates = html.xpath("//*[contains(@class, 'app-version-block')]//h5")
releasenotes = []
for version_date in versions_dates:
try:
date = datetime.datetime.strptime(re.search('\((.+?)\)', version_date.text).group(1), '%b %d, %Y')
version = re.search(r'Version (.+?) \(', version_date.text).group(1)
try:
note = version_date.getnext()
note = re.sub(b"[\r\n]+", b".", etree.tostring(note, pretty_print=True))
note = re.sub(b"<br />", b".", note)
note = re.sub(b"<br/>", b".", note)
note = b'. '.join(re.findall(b'<p>(.+?)</p>', note))
except:
note = ''
releasenotes.append({'date': date, 'version': version, 'note': note})
except:
pass
# notes = html.xpath("//*[contains(@class, 'app-version-note')]")
return releasenotes
belvaParseXML.py 文件源码
项目:Basic-Expression-Lexicon-Variation-Algorithms-BELVA
作者: kenb123
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def parseXMLxpathSearch(xml_source, xpathString):
#---------------------------------------------------------------------------------
return_values = []
try:
root = etree.XML(xml_source)
data_points = root.xpath(xpathString)
for data in data_points:
return_values.append(etree.tostring(data))
data.clear()
except:
pass
return return_values
#---------------------------------------------------------------------------------
# parse XML and return value asked (designed for errors via stdout)
belvaParseXML.py 文件源码
项目:Basic-Expression-Lexicon-Variation-Algorithms-BELVA
作者: kenb123
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def parseXMLxpathSearchSingle(xml_source, xpathString):
#---------------------------------------------------------------------------------
return_values = []
try:
root = etree.XML(xml_source)
data_points = root.xpath(xpathString)
for data in data_points:
return_values.append(data)
data.clear()
except:
pass
return return_values
#---------------------------------------------------------------------------------
# parse HTML and return value asked
belvaParseXML.py 文件源码
项目:Basic-Expression-Lexicon-Variation-Algorithms-BELVA
作者: kenb123
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def parseXMLxpathSearchAttribute(xml_source, xpathString):
#---------------------------------------------------------------------------------
return_values = []
try:
root = etree.XML(xml_source)
data_points = root.xpath(xpathString)
for data in data_points:
return_values.append(data)
data.clear()
except:
pass
return return_values
#---------------------------------------------------------------------------------
# parse HTML and return value asked
belvaParseXML.py 文件源码
项目:Basic-Expression-Lexicon-Variation-Algorithms-BELVA
作者: kenb123
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def parseHTMLxpathSearch(http_source, xpathString):
#---------------------------------------------------------------------------------
return_values = []
http_source= str(http_source).replace('\x00','')
try:
html = lxml.html.fromstring(http_source)
for data in html.xpath(xpathString):
return_values.append(etree.tostring(data.content))
data.clear()
except:
pass
return return_values
#---------------------------------------------------------------------------------
# parse HTML and return value asked
def __query_new_stocks(self):
DATA_URL = 'http://vip.stock.finance.sina.com.cn/corp/view/vRPD_NewStockIssue.php?page=1&cngem=0&orderBy=NetDate&orderType=desc'
html = lxml.html.parse(DATA_URL)
res = html.xpath('//table[@id=\"NewStockTable\"]/tr')
if six.PY2:
sarr = [etree.tostring(node) for node in res]
else:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
sarr = ''.join(sarr)
sarr = sarr.replace('<font color="red">*</font>', '')
sarr = '<table>%s</table>' % sarr
df = pd.read_html(StringIO(sarr), skiprows=[0, 1])[0]
df = df.select(lambda x: x in [0, 1, 2, 3, 7], axis=1)
df.columns = ['code', 'xcode', 'name', 'ipo_date', 'price']
df['code'] = df['code'].map(lambda x: str(x).zfill(6))
df['xcode'] = df['xcode'].map(lambda x: str(x).zfill(6))
return df
def latest_content(url):
'''
??????????
Parameter
--------
url:????
Return
--------
string:?????????
'''
try:
html = lxml.html.parse(url)
res = html.xpath('//div[@id=\"artibody\"]/p')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n').
html_content = lxml.html.fromstring(sarr)
content = html_content.text_content()
return content
except Exception as er:
print(str(er))
def _guba_content(url):
try:
html = lxml.html.parse(url)
res = html.xpath('//div[@class=\"ilt_p\"]/p')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n').
html_content = lxml.html.fromstring(sarr)
content = html_content.text_content()
ptime = html.xpath('//div[@class=\"fl_left iltp_time\"]/span/text()')[0]
rcounts = html.xpath('//div[@class=\"fl_right iltp_span\"]/span[2]/text()')[0]
reg = re.compile(r'\((.*?)\)')
rcounts = reg.findall(rcounts)[0]
return [content, ptime, rcounts]
except Exception:
return ['', '', '0']
def _today_ticks(symbol, tdate, pageNo, retry_count, pause):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
html = lxml.html.parse(ct.TODAY_TICKS_URL % (ct.P_TYPE['http'],
ct.DOMAINS['vsf'], ct.PAGES['t_ticks'],
symbol, tdate, pageNo
))
res = html.xpath('//table[@id=\"datatbl\"]/tbody/tr')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
sarr = sarr.replace('--', '0')
df = pd.read_html(StringIO(sarr), parse_dates=False)[0]
df.columns = ct.TODAY_TICK_COLUMNS
df['pchange'] = df['pchange'].map(lambda x : x.replace('%', ''))
except Exception as e:
print(e)
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _dist_cotent(year, pageNo, retry_count, pause):
for _ in range(retry_count):
time.sleep(pause)
try:
if pageNo > 0:
ct._write_console()
html = lxml.html.parse(rv.DP_163_URL%(ct.P_TYPE['http'], ct.DOMAINS['163'],
ct.PAGES['163dp'], year, pageNo))
res = html.xpath('//div[@class=\"fn_rp_list\"]/table')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
df = pd.read_html(sarr, skiprows=[0])[0]
df = df.drop(df.columns[0], axis=1)
df.columns = rv.DP_163_COLS
df['divi'] = df['plan'].map(_fun_divi)
df['shares'] = df['plan'].map(_fun_into)
df = df.drop('plan', axis=1)
df['code'] = df['code'].astype(object)
df['code'] = df['code'].map(lambda x : str(x).zfill(6))
pages = []
if pageNo == 0:
page = html.xpath('//div[@class=\"mod_pages\"]/a')
if len(page)>1:
asr = page[len(page)-2]
pages = asr.xpath('text()')
except Exception as e:
print(e)
else:
if pageNo == 0:
return df, pages[0] if len(pages)>0 else 0
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _get_forecast_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
html = lxml.html.parse(ct.FORECAST_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['fd'], year, quarter, pageNo,
ct.PAGE_NUM[1]))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = sarr.replace('--', '0')
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df = df.drop([4, 5, 8], axis=1)
df.columns = ct.FORECAST_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+',nextPage[0])[0]
return _get_forecast_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _newstocks(data, pageNo, retry_count, pause):
for _ in range(retry_count):
time.sleep(pause)
ct._write_console()
try:
html = lxml.html.parse(rv.NEW_STOCKS_URL%(ct.P_TYPE['http'],ct.DOMAINS['vsf'],
ct.PAGES['newstock'], pageNo))
res = html.xpath('//table[@id=\"NewStockTable\"]/tr')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = sarr.replace('<font color="red">*</font>', '')
sarr = '<table>%s</table>'%sarr
df = pd.read_html(StringIO(sarr), skiprows=[0, 1])[0]
df = df.drop([df.columns[idx] for idx in [1, 12, 13, 14]], axis=1)
df.columns = rv.NEW_STOCKS_COLS
df['code'] = df['code'].map(lambda x : str(x).zfill(6))
res = html.xpath('//table[@class=\"table2\"]/tr[1]/td[1]/a/text()')
tag = '???' if ct.PY3 else unicode('???', 'utf-8')
hasNext = True if tag in res else False
data = data.append(df, ignore_index=True)
pageNo += 1
if hasNext:
data = _newstocks(data, pageNo, retry_count, pause)
except Exception as ex:
print(ex)
else:
return data
def get_notices(code=None, date=None):
'''
??????
Parameters
--------
code:????
date:??????
Return
--------
DataFrame??????
title:????
type:????
date:????
url:????URL
'''
if code is None:
return None
symbol = 'sh' + code if code[:1] == '6' else 'sz' + code
url = nv.NOTICE_INFO_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['ntinfo'], symbol)
url = url if date is None else '%s&gg_date=%s'%(url, date)
html = lxml.html.parse(url)
res = html.xpath('//table[@class=\"body_table\"]/tbody/tr')
data = []
for td in res:
title = td.xpath('th/a/text()')[0]
type = td.xpath('td[1]/text()')[0]
date = td.xpath('td[2]/text()')[0]
url = '%s%s%s'%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], td.xpath('th/a/@href')[0])
data.append([title, type, date, url])
df = pd.DataFrame(data, columns=nv.NOTICE_INFO_CLS)
return df
def _parse_fq_data(url, index, retry_count, pause):
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(url)
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath('//table[@id=\"FundHoldSharesTable\"]')
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
df = pd.read_html(sarr, skiprows = [0, 1])[0]
if len(df) == 0:
return pd.DataFrame()
if index:
df.columns = ct.HIST_FQ_COLS[0:7]
else:
df.columns = ct.HIST_FQ_COLS
if df['date'].dtypes == np.object:
df['date'] = df['date'].astype(np.datetime64)
df = df.drop_duplicates('date')
except Exception as e:
print(e)
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _cap_tops(last=5, pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[0],
ct.PAGES['fd'], last, pageNo))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@id=\"dataTable\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns = rv.LHB_GGTJ_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _cap_tops(last, pageNo, retry_count, pause, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _inst_tops(last=5, pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[2],
ct.PAGES['fd'], last, pageNo))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@id=\"dataTable\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df = df.drop([2,3], axis=1)
df.columns = rv.LHB_JGZZ_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _inst_tops(last, pageNo, retry_count, pause, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _inst_detail(pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()):
ct._write_console()
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[3],
ct.PAGES['fd'], '', pageNo))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@id=\"dataTable\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns = rv.LHB_JGMX_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _inst_detail(pageNo, retry_count, pause, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _get_report_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
request = Request(ct.REPORT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'],
year, quarter, pageNo, ct.PAGE_NUM[1]))
print(ct.REPORT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'],
year, quarter, pageNo, ct.PAGE_NUM[1]))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
text = text.replace('--', '')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df = df.drop(11, axis=1)
df.columns = ct.REPORT_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _get_report_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _get_profit_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
request = Request(ct.PROFIT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['fd'], year,
quarter, pageNo, ct.PAGE_NUM[1]))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
text = text.replace('--', '')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns=ct.PROFIT_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _get_profit_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except:
pass
def _get_operation_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
request = Request(ct.OPERATION_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['fd'], year,
quarter, pageNo, ct.PAGE_NUM[1]))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
text = text.replace('--', '')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns=ct.OPERATION_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _get_operation_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _get_debtpaying_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
request = Request(ct.DEBTPAYING_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['fd'], year,
quarter, pageNo, ct.PAGE_NUM[1]))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns = ct.DEBTPAYING_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _get_debtpaying_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _get_cashflow_data(year, quarter, pageNo, dataArr):
ct._write_console()
try:
request = Request(ct.CASHFLOW_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'],
ct.PAGES['fd'], year,
quarter, pageNo, ct.PAGE_NUM[1]))
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
text = text.replace('--', '')
html = lxml.html.parse(StringIO(text))
res = html.xpath("//table[@class=\"list_table\"]/tr")
if ct.PY3:
sarr = [etree.tostring(node).decode('utf-8') for node in res]
else:
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
sarr = '<table>%s</table>'%sarr
df = pd.read_html(sarr)[0]
df.columns = ct.CASHFLOW_COLS
dataArr = dataArr.append(df, ignore_index=True)
nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick')
if len(nextPage)>0:
pageNo = re.findall(r'\d+', nextPage[0])[0]
return _get_cashflow_data(year, quarter, pageNo, dataArr)
else:
return dataArr
except Exception as e:
print(e)
def _parse_fq_data(url, index, retry_count, pause):
for _ in range(retry_count):
time.sleep(pause)
try:
request = Request(url)
text = urlopen(request, timeout=10).read()
text = text.decode('GBK')
html = lxml.html.parse(StringIO(text))
res = html.xpath('//table[@id=\"FundHoldSharesTable\"]')
sarr = [etree.tostring(node) for node in res]
sarr = ''.join(sarr)
if sarr == '':
return None
df = pd.read_html(sarr, skiprows = [0, 1])[0]
if len(df) == 0:
return pd.DataFrame()
if index:
df.columns = ct.HIST_FQ_COLS[0:7]
else:
df.columns = ct.HIST_FQ_COLS
if df['date'].dtypes == np.object:
df['date'] = df['date'].astype(np.datetime64)
df = df.drop_duplicates('date')
except ValueError as e:
# ????????????
return None
except Exception as e:
print(e)
else:
return df
raise IOError(ct.NETWORK_URL_ERROR_MSG)
def parse_venues(venue_types) :
methods = {'conf' : clean_conf,
'journals' : clean_journal}
venues = []
for venue_type in venue_types :
folder = config.DATA + ("venues/html/%s/" % venue_type)
print "\nProcessing folder '%s'" % folder
for file_name in os.listdir(folder) :
print " '%s'" % file_name
with open(os.path.join(folder, file_name), 'r') as file :
lines = file.readlines()
# Get the line of interest and parse it as an HTML
html = lxml.html.fromstring(lines[16])
for item in html.xpath("//div[@id='browse-%s-output']//li/a" % venue_type) :
process_method = methods[venue_type]
name = process_method(item.text_content())
venues.append((name, venue_type))
print "%d venues." % len(venues)
return venues
belvaParseXML.py 文件源码
项目:Basic-Expression-Lexicon-Variation-Algorithms-BELVA
作者: kenb123
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def parseCDATA(xml_source, xpathString):
#---------------------------------------------------------------------------------
return_values = []
print(xml_source)
root = etree.fromstring(xml_source)
for log in root.xpath(xpathString):
return_values.append(str(log.text))
return return_values