def parse_rsc_html(htmlstring):
"""Messy RSC HTML needs this special parser to fix problems before creating selector."""
converted = UnicodeDammit(htmlstring)
if not converted.unicode_markup:
raise UnicodeDecodeError('Failed to detect encoding, tried [%s]')
root = fromstring(htmlstring, parser=HTMLParser(recover=True, encoding=converted.original_encoding))
# Add p.otherpara tags around orphan text
newp = None
for child in root.get_element_by_id('wrapper'):
if newp is not None:
if child.tag in BLOCK_ELEMENTS or child.get('id', '').startswith('sect') or child.getnext() is None:
child.addprevious(newp)
newp = None
else:
newp.append(child)
if newp is None and child.tag in BLOCK_ELEMENTS and child.tail and child.tail.strip():
newp = Element('p', **{'class': 'otherpara'})
newp.text = child.tail
child.tail = ''
return root
python类HTMLParser()的实例源码
def __call__(self, doc, encoding='UTF-8'):
if isinstance(doc,
(str, bytes)):
doc = fromstring(bytes(bytearray(doc,
encoding=encoding)),
parser=HTMLParser(encoding=encoding))
if not isinstance(doc,
HtmlElement):
return None
for cls in self.EXTRACTORS:
extract = cls()
tags_ = extract(doc)
if tags_:
tags = []
for idx, tag in enumerate(tags_):
if idx < 2 and len(tag) > 16:
break
elif len(tag) < 16:
tags.append(tag)
else:
if tags:
logger.info('TagExtractor got tags %s',
tags)
return tags
def from_text(cls, text, base_url=None, parser=HTMLParser, translator=CssHTMLTranslator, fmt='html', namespaces=None, encoding=None):
log.debug('Parsing {} with {}'.format(fmt, parser))
root = fromstring(text, parser=parser(recover=True, encoding=cls._get_encoding(text, encoding)), base_url=base_url)
if base_url and hasattr(root, 'make_links_absolute'):
root.make_links_absolute()
return cls(root, translator=translator, fmt=fmt, namespaces=namespaces)
def from_html_text(cls, text, base_url=None, namespaces=None, encoding=None):
return cls.from_text(text, base_url=base_url, parser=HTMLParser, translator=CssHTMLTranslator, fmt='html', namespaces=namespaces, encoding=encoding)
def from_response(cls, response, parser=HTMLParser, translator=CssHTMLTranslator, fmt='html', namespaces=None):
return cls.from_text(response.content, response.url, parser, translator, fmt, namespaces=namespaces, encoding=response.encoding)
def from_html(cls, response, namespaces=None):
return cls.from_response(response, parser=HTMLParser, translator=CssHTMLTranslator, fmt='html', namespaces=namespaces)
def to_xml(content, **kwargs):
return html.fromstring(html=content, parser=html.HTMLParser(encoding='utf-8'), **kwargs)
def get_media_requests(self, item, info):
doc = item['content']
if isinstance(doc,
(str, bytes)):
doc = fromstring(doc,
parser=HTMLParser(encoding=item['encoding']))
item['content'] = doc
try:
attr = self.spiderinfo.spider.image_url_attr
except AttributeError:
attr = 'src'
urls = []
for e in doc.xpath('//img'):
if attr in e.attrib:
url = e.get(attr).strip(' \t\n')
if url.startswith('/'):
url = urljoin(item['link'].strip(),
url)
if url.startswith('//'):
url = 'http:' + url
urls.append((url, e))
reqs = []
for url, e in urls:
if not url.startswith('data'):
try:
r = Request(url,
meta={'img': e})
except ValueError:
logger.error((
'Error in pipeline image create Request[{}]'
).format(url))
else:
reqs.append(r)
return reqs
def _pretty_arch(self, arch):
# remove_blank_string does not seem to work on HTMLParser, and
# pretty-printing with lxml more or less requires stripping
# whitespace: http://lxml.de/FAQ.html#why-doesn-t-the-pretty-print-option-reformat-my-xml-output
# so serialize to XML, parse as XML (remove whitespace) then serialize
# as XML (pretty print)
arch_no_whitespace = etree.fromstring(
etree.tostring(arch, encoding='utf-8'),
parser=etree.XMLParser(encoding='utf-8', remove_blank_text=True))
return etree.tostring(
arch_no_whitespace, encoding='unicode', pretty_print=True)
def save(self, cr, uid, res_id, value, xpath=None, context=None):
""" Update a view section. The view section may embed fields to write
:param str model:
:param int res_id:
:param str xpath: valid xpath to the tag to replace
"""
res_id = int(res_id)
arch_section = html.fromstring(
value, parser=html.HTMLParser(encoding='utf-8'))
if xpath is None:
# value is an embedded field on its own, not a view section
self.save_embedded_field(cr, uid, arch_section, context=context)
return
for el in self.extract_embedded_fields(cr, uid, arch_section, context=context):
self.save_embedded_field(cr, uid, el, context=context)
# transform embedded field back to t-field
el.getparent().replace(el, self.to_field_ref(cr, uid, el, context=context))
arch = self.replace_arch_section(cr, uid, res_id, xpath, arch_section, context=context)
self.write(cr, uid, res_id, {
'arch': self._pretty_arch(arch)
}, context=context)
view = self.browse(cr, SUPERUSER_ID, res_id, context=context)
if view.model_data_id:
view.model_data_id.write({'noupdate': True})
def field_rountrip_result(self, field, value, expected):
model = 'website.converter.test'
Model = self.registry(model)
id = Model.create(
self.cr, self.uid, {
field: value
})
[record] = Model.browse(self.cr, self.uid, [id])
e = etree.Element('span')
field_value = 'record.%s' % field
e.set('t-field', field_value)
rendered = self.registry('website.qweb').render_tag_field(
e, {'field': field_value}, '', ir_qweb.QWebContext(self.cr, self.uid, {
'record': record,
}, context={'inherit_branding': True}))
element = html.fromstring(
rendered, parser=html.HTMLParser(encoding='utf-8'))
converter = self.registry('website.qweb').get_converter_for(
element.get('data-oe-type'))
value_back = converter.from_html(
self.cr, self.uid, model, Model._fields[field], element)
if isinstance(expected, str):
expected = expected.decode('utf-8')
self.assertEqual(value_back, expected)
def fix_parsel_parser(new_type='html_html', base_type='html'):
"""Fix a custom parser for parsel using lxml.html.HTMLParser.
The main reason is to have handy helpers as make_links_absolute method.
"""
selector._ctgroup[new_type] = selector._ctgroup[base_type].copy()
selector._ctgroup[new_type]['_parser'] = lxml_html.HTMLParser
return new_type
def process_item(self, item, spider):
item['title'] = self.format_title(item['title'])
doc = item['content']
if not isinstance(doc,
HtmlElement):
if isinstance(doc,
(str, bytes)):
doc = fromstring(bytes(bytearray(doc,
encoding=item['encoding'])),
parser=HTMLParser(encoding=item['encoding']))
else:
raise ContentException((
'Error in content pipeline unsupported doc type[{}]'
).format(doc.__class__.__name__))
# remove element with class name for clean display
removed_classes = getattr(spider,
self.REMOVED_CLASSES_NAME,
None)
if removed_classes is not None:
doc = self.remove_element_with_class(doc,
removed_classes)
# remove element with xpath for clean display
removed_xpath_nodes = getattr(spider,
self.REMOVED_XPATH_NODES_NAME,
None)
if removed_xpath_nodes is not None:
doc = self.remove_element_with_xpath(doc,
removed_xpath_nodes)
allow_classes = getattr(spider,
self.ALLOW_CLASSES_NAME,
None)
safe_attrs = getattr(spider,
self.SAFE_ATTRS_NAME,
None)
doc = self.clean_html(doc,
allow_classes=allow_classes,
safe_attrs=safe_attrs)
doc = self.make_abs_link(doc,
item['link'])
item['content'] = doc
return item
def test_m2o(self):
""" the M2O field conversion (from html) is markedly different from
others as it directly writes into the m2o and returns nothing at all.
"""
model = 'website.converter.test'
field = 'many2one'
Sub = self.registry('website.converter.test.sub')
sub_id = Sub.create(self.cr, self.uid, {'name': "Foo"})
Model = self.registry(model)
id = Model.create(self.cr, self.uid, {field: sub_id})
[record] = Model.browse(self.cr, self.uid, [id])
e = etree.Element('span')
field_value = 'record.%s' % field
e.set('t-field', field_value)
rendered = self.registry('website.qweb').render_tag_field(
e, {'field': field_value}, '', ir_qweb.QWebContext(self.cr, self.uid, {
'record': record,
}, context={'inherit_branding': True}))
element = html.fromstring(rendered, parser=html.HTMLParser(encoding='utf-8'))
# emulate edition
element.text = "New content"
converter = self.registry('website.qweb').get_converter_for(
element.get('data-oe-type'))
value_back = converter.from_html(
self.cr, self.uid, model, Model._fields[field], element)
self.assertIsNone(
value_back, "the m2o converter should return None to avoid spurious"
" or useless writes on the parent record")
self.assertEqual(
Sub.browse(self.cr, self.uid, sub_id).name,
"New content",
"element edition should have been written directly to the m2o record"
)
html.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def _build_doc(self):
"""
Raises
------
ValueError
* If a URL that lxml cannot parse is passed.
Exception
* Any other ``Exception`` thrown. For example, trying to parse a
URL that is syntactically correct on a machine with no internet
connection will fail.
See Also
--------
pandas.io.html._HtmlFrameParser._build_doc
"""
from lxml.html import parse, fromstring, HTMLParser
from lxml.etree import XMLSyntaxError
parser = HTMLParser(recover=False, encoding=self.encoding)
try:
# try to parse the input in the simplest way
r = parse(self.io, parser=parser)
try:
r = r.getroot()
except AttributeError:
pass
except (UnicodeDecodeError, IOError):
# if the input is a blob of html goop
if not _is_url(self.io):
r = fromstring(self.io, parser=parser)
try:
r = r.getroot()
except AttributeError:
pass
else:
# not a url
scheme = parse_url(self.io).scheme
if scheme not in _valid_schemes:
# lxml can't parse it
msg = ('%r is not a valid url scheme, valid schemes are '
'%s') % (scheme, _valid_schemes)
raise ValueError(msg)
else:
# something else happened: maybe a faulty connection
raise
else:
if not hasattr(r, 'text_content'):
raise XMLSyntaxError("no text parsed from document", 0, 0, 0)
return r