def myopen_http(method, url, values):
if not url:
raise ValueError("cannot submit, no URL provided")
## FIXME: should test that it's not a relative URL or something
try:
from urllib import urlencode, urlopen
except ImportError: # Python 3
from urllib.request import urlopen
from urllib.parse import urlencode
if method == 'GET':
if '?' in url:
url += '&'
else:
url += '?'
url += urlencode(values)
data = None
else:
data = urlencode(values).encode('utf-8')
return urlopen(url, data)
python类parse()的实例源码
def items(self):
"""
Request URL and parse response. Yield a ``Torrent`` for every torrent
on page. If in multipage mode, Torrents from next pages are
automatically chained.
"""
if self._multipage:
while True:
# Pool for more torrents
items = super(Paginated, self).items()
# Stop if no more torrents
first = next(items, None)
if first is None:
raise StopIteration()
# Yield them if not
else:
yield first
for item in items:
yield item
# Go to the next page
self.next()
else:
for item in super(Paginated, self).items():
yield item
def created(self):
"""
Attempt to parse the human readable torrent creation datetime.
"""
timestamp, current = self._created
if timestamp.endswith('ago'):
quantity, kind, ago = timestamp.split()
quantity = int(quantity)
if 'sec' in kind:
current -= quantity
elif 'min' in kind:
current -= quantity * 60
elif 'hour' in kind:
current -= quantity * 60 * 60
return datetime.datetime.fromtimestamp(current)
current = datetime.datetime.fromtimestamp(current)
timestamp = timestamp.replace('Y-day', str(current.date() - datetime.timedelta(days=1)))
timestamp = timestamp.replace('Today', current.date().isoformat())
try:
return dateutil.parser.parse(timestamp)
except:
return current
def items(self):
"""
Request URL and parse response. Yield a ``Torrent`` for every torrent
on page. If in multipage mode, Torrents from next pages are
automatically chained.
"""
if self._multipage:
while True:
# Pool for more torrents
items = super(Paginated, self).items()
# Stop if no more torrents
first = next(items, None)
if first is None:
raise StopIteration()
# Yield them if not
else:
yield first
for item in items:
yield item
# Go to the next page
self.next()
else:
for item in super(Paginated, self).items():
yield item
def created(self):
"""
Attempt to parse the human readable torrent creation datetime.
"""
timestamp, current = self._created
if timestamp.endswith('ago'):
quantity, kind, ago = timestamp.split()
quantity = int(quantity)
if 'sec' in kind:
current -= quantity
elif 'min' in kind:
current -= quantity * 60
elif 'hour' in kind:
current -= quantity * 60 * 60
return datetime.datetime.fromtimestamp(current)
current = datetime.datetime.fromtimestamp(current)
timestamp = timestamp.replace('Y-day', str(current.date() - datetime.timedelta(days=1)))
timestamp = timestamp.replace('Today', current.date().isoformat())
try:
return dateutil.parser.parse(timestamp)
except:
return current
def _file_to_tree(_data_format, _reference):
"""Reads a file and chooses the right parser to make it an lxml element tree"""
print("format_to_tree : " + _data_format)
if _data_format == 'HTML':
from lxml import html
return html.parse(_reference)
if _data_format == 'XML':
from lxml import etree
return etree.parse(_reference)
if _data_format == 'JSON':
from lxml import etree
from json_lxml import element
with open(_reference, "r") as _f:
_top_element = json.load(_f)
return etree.ElementTree(element("top",_top_element))
else:
raise Exception("_file_to_tree: " + _data_format + " is not supported")
data.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def _parse_url(self, url):
"""
Downloads and parses a URL, returns xml root.
"""
try:
from lxml.html import parse
except ImportError:
raise ImportError("Please install lxml if you want to use the "
"{0!r} class".format(self.__class__.__name__))
try:
doc = parse(url)
except _network_error_classes:
raise RemoteDataError("Unable to parse URL "
"{0!r}".format(url))
else:
root = doc.getroot()
if root is None:
raise RemoteDataError("Parsed URL {0!r} has no root"
"element".format(url))
return root
def _parse_url(self, url):
"""
Downloads and parses a URL, returns xml root.
"""
try:
from lxml.html import parse
except ImportError: # pragma: no cover
raise ImportError("Please install lxml if you want to use the "
"{0!r} class".format(self.__class__.__name__))
doc = parse(self._read_url_as_StringIO(url))
root = doc.getroot()
if root is None: # pragma: no cover
raise RemoteDataError("Parsed URL {0!r} has no root"
"element".format(url))
return root
def get_available_datasets():
"""
Get the list of datasets available from the Fama/French data library.
Returns
-------
A list of valid inputs for get_data_famafrench.
"""
try:
from lxml.html import parse
except ImportError:
raise ImportError("Please install lxml if you want to use the "
"get_datasets_famafrench function")
root = parse(_URL + 'data_library.html')
l = filter(lambda x: x.startswith(_URL_PREFIX) and x.endswith(_URL_SUFFIX),
[e.attrib['href'] for e in root.findall('.//a') if 'href' in e.attrib])
return lmap(lambda x: x[len(_URL_PREFIX):-len(_URL_SUFFIX)], l)
def parse(filename, options=None):
"""
Parse Selenium IDE - Test Results Plugin output files.
"""
options = options or {}
try:
parsed_html = html.parse(filename)
except html.HTMLSyntaxError:
raise importer.ParserError('TEST invalid XML syntax')
suite = parsed_html.find("//table[@id='suiteSummaryTable']/thead/tr/td")
if suite is None:
raise importer.ParserError('Test Suite not found')
suite = suite.text
if not suite.startswith(_SUITE_HEADER):
raise importer.ParserError('invalid test results')
# get suite name from 'Test Suite: <testname>'
suitename = suite[len(_SUITE_HEADER) + 1:].strip()
root = parsed_html.getroot()
suitetbls = root.find_class('test_case')
if suitetbls is None:
raise importer.ParserError('no test cases found')
return [_parse_test(tbl, suitename) for tbl in suitetbls]
def get_article_info(url):
"""
Returns a dictionary with the article info.
The dictionary contains the following fields:
- date
- title
- tags (list of tags at the end of the article)
- url
"""
content = urllib2.urlopen(url)
tree = html.parse(content)
content.close()
title = tree.xpath('//h1[@id="articulo-titulo"]/text()')[0]
date = tree.xpath('//time//a/text()')[0].strip()
tags = tree.xpath('//li[@itemprop="keywords"]/a/text()')
url = url
result = {'date': date, 'title': title, 'tags': tags, 'url': url}
return(result)
def parse_rss(url=None, **kwargs):
try:
f = fetch(decode(url), **kwargs)
except (ValueError, URLError):
parsed = rssparser.parse(url)
else:
content = f.read() if speedparser else f
try:
parsed = rssparser.parse(content)
finally:
f.close()
return parsed
def xml2etree(f, xml=True, html5=False):
if xml:
element_tree = etree.parse(f)
elif html5 and html5parser:
element_tree = html5parser.parse(f)
elif html5parser:
element_tree = html.parse(f)
else:
# html5lib's parser returns an Element, so we must convert it into an
# ElementTree
element_tree = ElementTree(html.parse(f))
return element_tree
def myopen_http(method, url, values):
if not url:
raise ValueError("cannot submit, no URL provided")
## FIXME: should test that it's not a relative URL or something
try:
from urllib import urlencode, urlopen
except ImportError: # Python 3
from urllib.request import urlopen
from urllib.parse import urlencode
if method == 'GET':
if '?' in url:
url += '&'
else:
url += '?'
url += urlencode(values)
data = None
else:
data = urlencode(values).encode('utf-8')
return urlopen(url, data)
#
# main_domain_stat='file:///Users/Zharkov/Downloads/test2.htm'
#
# page=html.parse(main_domain_stat)
#
# e = page.getroot().\
# find_class('cl_hr').\
# pop()
#
# t=e.getchildren().pop()
#
# print(e, t)
def items(self):
"""
Request URL and parse response. Yield a ``Torrent`` for every torrent
on page.
"""
os.system("curl %s -o /tmp.html -s" % str(self.url))
request = urlopen("file:///tmp.html")
document = html.parse(request)
root = document.getroot()
items = [self._build_torrent(row) for row in
self._get_torrent_rows(root)]
for item in items:
yield item
def info(self):
if self._info is None:
os.system("curl %s -o /tmp.html -s" % str(self.url))
request = urlopen("file:///tmp.html")
document = html.parse(request)
root = document.getroot()
if root.cssselect('#details > .nfo > pre') != []:
info = root.cssselect('#details > .nfo > pre')[0].text_content()
else:
info = None
self._info = info
return self._info
def files(self):
if not self._files:
path = '/ajax_details_filelist.php?id={id}'.format(id=self.id)
url = self.url.path(path)
os.system("curl %s -o /tmp.html -s" % str(self.url))
request = urlopen("file:///tmp.html")
document = html.parse(request)
root = document.getroot()
rows = root.findall('.//tr')
for row in rows:
name, size = [unicode(v.text_content())
for v in row.findall('.//td')]
self._files[name] = size.replace('\xa0', ' ')
return self._files
def info(self):
if self._info is None:
request = urlopen(str(self.url))
document = html.parse(request)
root = document.getroot()
if root.cssselect('#details > .nfo > pre') != []:
info = root.cssselect('#details > .nfo > pre')[0].text_content()
else:
info = None
self._info = info
return self._info
def files(self):
if not self._files:
path = '/ajax_details_filelist.php?id={id}'.format(id=self.id)
url = self.url.path(path)
request = urlopen(str(self.url))
document = html.parse(request)
root = document.getroot()
rows = root.findall('.//tr')
for row in rows:
name, size = [unicode(v.text_content())
for v in row.findall('.//td')]
self._files[name] = size.replace('\xa0', ' ')
return self._files
def parse():
"""Parse the command line """
parser = argparse.ArgumentParser(description='Query Leo',
usage='%(prog)s [OPTIONS] QUERYSTRING')
parser.add_argument( '-D', '--with-defs',
action="store_true",
default=False,
help="Include any definitions in the result (default: %(default)s)",
)
parser.add_argument( '-E', '--with-examples',
action="store_true",
default=False,
help="Include examples in the result (default: %(default)s)",
)
parser.add_argument( '-P', '--with-phrases',
action="store_true",
default=False,
help="Include phrases in the result (default: %(default)s)",
)
#parser.add_argument( '-F', '--with-forums',
# action="store_true",
# default=False,
# help="Include forums in the result (default: %(default)s)",
# )
parser.add_argument('query',
metavar="QUERYSTRING",
help="Query string",
)
return parser.parse_args()
def getLeoPage(url):
"""Return root node of Leo's result HTML page
"""
doc=htmlparser.parse(url)
html=doc.getroot()
return html
def read_html(self, infile):
"""Parse a HTML file."""
with open(infile, encoding='utf-8', mode='r') as input:
return html.parse(input)
def add_root_attributes(self, root, tree, infile):
root.attrib['id'] = os.path.splitext(os.path.basename(infile))[0]
root.attrib['lang'] = self.language.lower()
date_string = re.match(
r'^(.+?,? \d.+?) - (.+)$',
tree.xpath('//td[@class="doc_title" and @align="left" and @valign="top"]')[0].text)
date = dateparser.parse(date_string.group(1)).date()
place = date_string.group(2)
root.attrib['date'] = str(date)
root.attrib['place'] = place
root.attrib['edition'] = tree.xpath('//td[@class="doc_title" and @align="right" and @valign="top"]')[0].text
pass
def read_html(self, infile):
"""Parse a HTML file."""
with open(infile, encoding='utf-8', mode='r') as input:
return html.parse(input)
test_html.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_lxml_elements(url, element):
_skip_if_no('lxml')
from lxml.html import parse
doc = parse(url)
return doc.xpath('.//{0}'.format(element))
html.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def _parse_tables(self, doc, match, attrs):
"""Return all tables from the parsed DOM.
Parameters
----------
doc : tree-like
The DOM from which to parse the table element.
match : str or regular expression
The text to search for in the DOM tree.
attrs : dict
A dictionary of table attributes that can be used to disambiguate
mutliple tables on a page.
Raises
------
ValueError
* If `match` does not match any text in the document.
Returns
-------
tables : list of node-like
A list of <table> elements to be parsed into raw data.
"""
raise AbstractMethodError(self)
def mark_contribs(html_file, marked_html_file) :
h = html.parse(html_file)
# text = "".join([ p.text_content() for p in h.xpath("//p") ])
pars = h.xpath("//p")
for par in pars :
# Get the paragraph's text fixing the hyphenation
text = par.text_content().replace("-\n", "")
sentences = tokenizer.tokenize(text.strip())
scores = map(calc_score, sentences)
intervals = max_subarray(scores, 1.0)
mask = positive_ones(len(sentences), intervals)
par.clear()
texts = []
# text = ''
# marked_sentences = []
for i, s in enumerate(sentences) :
if mask[i] :
marked = etree.Element("font", style="background-color:yellow", score=str(scores[i]))
marked.text = s
marked.tail = ''
par.append(marked)
else :
if len(par):
marked = par[-1]
marked.tail += ' ' + s
else:
texts.append(s)
par.text = ' '.join(texts)
h.write(marked_html_file, pretty_print=True, method="html")
def get_section(html_file, section_name, possible_next_sections):
h = html.parse(html_file)
pars = h.xpath("//p")
begin = end = -1
for i, par in enumerate(pars) :
if (begin>0) and (end>0) :
break
par_text = par.text_content().lower()
if begin<0 and (par_text.find(section_name, 0, 20) >= 0) :
begin = i
if begin>=0 :
for next_section in possible_next_sections :
if (par_text.find(next_section, 0, 20) >= 0) :
end = i
text = ""
if (begin<0) or (end<0) :
raise SectionNotFound("Section %s not found."%section_name)
text = "".join([par.text_content() for par in pars[begin:end]])
return text
def totxt(self, paperid):
'''
Converts HTML to pure text by extracting all text elements from the the HTML.
'''
infile = config.HTML_PATH % paperid
outfile = config.TXT_PATH % paperid
h = html.parse(infile)
pars = h.xpath("//p")
text = ''.join([par.text_content() for par in pars])
text = text.replace("-\n", "")
with open(outfile, 'w') as f :
f.write(text.encode("UTF-8"))
def get_section(self, html_file, possible_section_names, possible_next_sections):
# Open and parse HTML, then extract all textual content from each paragraph
h = html.parse(html_file) #, parser=etree.XMLParser(encoding="utf-8"))
pars = [paragraph.text_content().lower().encode("UTF-8") for paragraph in h.xpath("//p")] # .encode("utf-8")
# First we go backwards trying to find the latest occurrence of
# one of the possible names of the section of interest
begin = None
for i in reversed(xrange(len(pars))) :
if match_any(pars[i], possible_section_names) :
begin = i
break
# If the start wasn't found, just halt right away
if (begin is None) :
return ""
# Otherwise we can look for the end of the section starting from the start
# of the found section.
end = None
for j in xrange(begin+1, len(pars)) :
if match_any(pars[j], possible_next_sections) :
end = j
break
# End of section not found, so it's not safe to keep this content,
# so we return an empty string.
if (end is None) :
return ""
# Otherwise join all paragraphs inside the section found
return unicode("".join([fix_hyphens(p) for p in pars[begin:end]]), "UTF-8")