def __iter__(self):
depth = 0
ignore_until = None
previous = None
for event in self.tree:
if previous is not None:
if previous[0] == START:
depth += 1
if ignore_until <= depth:
ignore_until = None
if ignore_until is None:
for token in self.tokens(previous, event):
yield token
if token["type"] == "EmptyTag":
ignore_until = depth
if previous[0] == END:
depth -= 1
previous = event
if previous is not None:
if ignore_until is None or ignore_until <= depth:
for token in self.tokens(previous, None):
yield token
elif ignore_until is not None:
raise ValueError("Illformed DOM event stream: void element without END_ELEMENT")
python类START的实例源码
def _interpolate_attrs(self, stream):
for kind, data, pos in stream:
if kind is START:
# Record any directive attributes in start tags
tag, attrs = data
new_attrs = []
for name, value in attrs:
if value:
value = list(interpolate(value, self.filepath, pos[1],
pos[2], lookup=self.lookup))
if len(value) == 1 and value[0][0] is TEXT:
value = value[0][1]
new_attrs.append((name, value))
data = tag, Attrs(new_attrs)
yield kind, data, pos
def _simplify(stream, with_attrs=False):
"""Simplify a marked stream."""
def _generate():
for mark, (kind, data, pos) in stream:
if kind is START:
if with_attrs:
data = (unicode(data[0]), dict((unicode(k), v)
for k, v in data[1]))
else:
data = unicode(data[0])
elif kind is END:
data = unicode(data)
elif kind is ATTR:
kind = ATTR
data = dict((unicode(k), v) for k, v in data[1])
yield mark, kind, data
return list(_generate())
def test_wrap_adjacent_elements(self):
self.assertEqual(
_transform(FOOBAR, Transformer('foo|bar').wrap('wrap')),
[(None, START, u'root'),
(None, TEXT, u'ROOT'),
(None, START, u'wrap'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, END, u'wrap'),
(None, START, u'wrap'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, END, u'wrap'),
(None, END, u'root')]
)
def test_before_adjacent_elements(self):
self.assertEqual(
self._apply('*'),
[(None, START, u'root'),
(None, TEXT, u'ROOT'),
(None, TEXT, u'CONTENT 1'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, TEXT, u'CONTENT 2'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, END, u'root')]
)
def test_before_all(self):
self.assertEqual(
self._apply('*|text()'),
[(None, START, u'root'),
(None, TEXT, u'CONTENT 1'),
(OUTSIDE, TEXT, u'ROOT'),
(None, TEXT, u'CONTENT 2'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, TEXT, u'CONTENT 3'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, END, u'root')]
)
def test_before_with_callback(self):
count = [0]
def content():
count[0] += 1
yield '%2i.' % count[0]
self.assertEqual(
self._apply('foo/text()', content),
[(None, 'START', u'root'),
(None, 'TEXT', u'ROOT'),
(None, 'START', u'foo'),
(None, 'TEXT', u' 1.'),
('OUTSIDE', 'TEXT', u'FOO'),
(None, 'END', u'foo'),
(None, 'START', u'bar'),
(None, 'TEXT', u'BAR'),
(None, 'END', u'bar'),
(None, 'END', u'root')]
)
def test_after_adjacent_elements(self):
self.assertEqual(
self._apply('*'),
[(None, START, u'root'),
(None, TEXT, u'ROOT'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, TEXT, u'CONTENT 1'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, TEXT, u'CONTENT 2'),
(None, END, u'root')]
)
def test_after_all(self):
self.assertEqual(
self._apply('*|text()'),
[(None, START, u'root'),
(OUTSIDE, TEXT, u'ROOT'),
(None, TEXT, u'CONTENT 1'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, TEXT, u'CONTENT 2'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, TEXT, u'CONTENT 3'),
(None, END, u'root')]
)
def test_after_with_callback(self):
count = [0]
def content():
count[0] += 1
yield '%2i.' % count[0]
self.assertEqual(
self._apply('foo/text()', content),
[(None, 'START', u'root'),
(None, 'TEXT', u'ROOT'),
(None, 'START', u'foo'),
('OUTSIDE', 'TEXT', u'FOO'),
(None, 'TEXT', u' 1.'),
(None, 'END', u'foo'),
(None, 'START', u'bar'),
(None, 'TEXT', u'BAR'),
(None, 'END', u'bar'),
(None, 'END', u'root')]
)
def test_prepend_adjacent_elements(self):
self.assertEqual(
self._apply('*'),
[(None, START, u'root'),
(None, TEXT, u'ROOT'),
(ENTER, START, u'foo'),
(None, TEXT, u'CONTENT 1'),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(ENTER, START, u'bar'),
(None, TEXT, u'CONTENT 2'),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, END, u'root')]
)
def test_prepend_with_callback(self):
count = [0]
def content():
count[0] += 1
yield '%2i.' % count[0]
self.assertEqual(
self._apply('foo', content),
[(None, 'START', u'root'),
(None, 'TEXT', u'ROOT'),
(ENTER, 'START', u'foo'),
(None, 'TEXT', u' 1.'),
(INSIDE, 'TEXT', u'FOO'),
(EXIT, 'END', u'foo'),
(None, 'START', u'bar'),
(None, 'TEXT', u'BAR'),
(None, 'END', u'bar'),
(None, 'END', u'root')]
)
def test_append_adjacent_elements(self):
self.assertEqual(
self._apply('*'),
[(None, START, u'root'),
(None, TEXT, u'ROOT'),
(ENTER, START, u'foo'),
(INSIDE, TEXT, u'FOO'),
(None, TEXT, u'CONTENT 1'),
(EXIT, END, u'foo'),
(ENTER, START, u'bar'),
(INSIDE, TEXT, u'BAR'),
(None, TEXT, u'CONTENT 2'),
(EXIT, END, u'bar'),
(None, END, u'root')]
)
def test_append_with_callback(self):
count = [0]
def content():
count[0] += 1
yield '%2i.' % count[0]
self.assertEqual(
self._apply('foo', content),
[(None, 'START', u'root'),
(None, 'TEXT', u'ROOT'),
(ENTER, 'START', u'foo'),
(INSIDE, 'TEXT', u'FOO'),
(None, 'TEXT', u' 1.'),
(EXIT, 'END', u'foo'),
(None, 'START', u'bar'),
(None, 'TEXT', u'BAR'),
(None, 'END', u'bar'),
(None, 'END', u'root')]
)
def test_attr_from_function(self):
def set(name, event):
self.assertEqual(name, 'name')
return event[1][1].get('name').upper()
self.assertEqual(
self._attr('foo|bar', 'name', set),
[(None, START, (u'root', {})),
(None, TEXT, u'ROOT'),
(ENTER, START, (u'foo', {u'name': 'FOO', u'size': '100'})),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(ENTER, START, (u'bar', {u'name': 'BAR'})),
(INSIDE, TEXT, u'BAR'),
(EXIT, END, u'bar'),
(None, END, u'root')]
)
def test_remove_attr_with_function(self):
def set(name, event):
return None
self.assertEqual(
self._attr('foo', 'name', set),
[(None, START, (u'root', {})),
(None, TEXT, u'ROOT'),
(ENTER, START, (u'foo', {u'size': '100'})),
(INSIDE, TEXT, u'FOO'),
(EXIT, END, u'foo'),
(None, START, (u'bar', {u'name': u'bar'})),
(None, TEXT, u'BAR'),
(None, END, u'bar'),
(None, END, u'root')]
)
def select(self, path):
"""Mark events matching the given XPath expression, within the current
selection.
>>> html = HTML('<body>Some <em>test</em> text</body>', encoding='utf-8')
>>> print(html | Transformer().select('.//em').trace())
(None, ('START', (QName('body'), Attrs()), (None, 1, 0)))
(None, ('TEXT', u'Some ', (None, 1, 6)))
('ENTER', ('START', (QName('em'), Attrs()), (None, 1, 11)))
('INSIDE', ('TEXT', u'test', (None, 1, 15)))
('EXIT', ('END', QName('em'), (None, 1, 19)))
(None, ('TEXT', u' text', (None, 1, 24)))
(None, ('END', QName('body'), (None, 1, 29)))
<body>Some <em>test</em> text</body>
:param path: an XPath expression (as string) or a `Path` instance
:return: the stream augmented by transformation marks
:rtype: `Transformer`
"""
return self.apply(SelectTransformation(path))
def invert(self):
"""Invert selection so that marked events become unmarked, and vice
versa.
Specificaly, all marks are converted to null marks, and all null marks
are converted to OUTSIDE marks.
>>> html = HTML('<body>Some <em>test</em> text</body>', encoding='utf-8')
>>> print(html | Transformer('//em').invert().trace())
('OUTSIDE', ('START', (QName('body'), Attrs()), (None, 1, 0)))
('OUTSIDE', ('TEXT', u'Some ', (None, 1, 6)))
(None, ('START', (QName('em'), Attrs()), (None, 1, 11)))
(None, ('TEXT', u'test', (None, 1, 15)))
(None, ('END', QName('em'), (None, 1, 19)))
('OUTSIDE', ('TEXT', u' text', (None, 1, 24)))
('OUTSIDE', ('END', QName('body'), (None, 1, 29)))
<body>Some <em>test</em> text</body>
:rtype: `Transformer`
"""
return self.apply(InvertTransformation())
def end(self):
"""End current selection, allowing all events to be selected.
Example:
>>> html = HTML('<body>Some <em>test</em> text</body>', encoding='utf-8')
>>> print(html | Transformer('//em').end().trace())
('OUTSIDE', ('START', (QName('body'), Attrs()), (None, 1, 0)))
('OUTSIDE', ('TEXT', u'Some ', (None, 1, 6)))
('OUTSIDE', ('START', (QName('em'), Attrs()), (None, 1, 11)))
('OUTSIDE', ('TEXT', u'test', (None, 1, 15)))
('OUTSIDE', ('END', QName('em'), (None, 1, 19)))
('OUTSIDE', ('TEXT', u' text', (None, 1, 24)))
('OUTSIDE', ('END', QName('body'), (None, 1, 29)))
<body>Some <em>test</em> text</body>
:return: the stream augmented by transformation marks
:rtype: `Transformer`
"""
return self.apply(EndTransformation())
#{ Deletion operations
def trace(self, prefix='', fileobj=None):
"""Print events as they pass through the transform.
>>> html = HTML('<body>Some <em>test</em> text</body>', encoding='utf-8')
>>> print(html | Transformer('em').trace())
(None, ('START', (QName('body'), Attrs()), (None, 1, 0)))
(None, ('TEXT', u'Some ', (None, 1, 6)))
('ENTER', ('START', (QName('em'), Attrs()), (None, 1, 11)))
('INSIDE', ('TEXT', u'test', (None, 1, 15)))
('EXIT', ('END', QName('em'), (None, 1, 19)))
(None, ('TEXT', u' text', (None, 1, 24)))
(None, ('END', QName('body'), (None, 1, 29)))
<body>Some <em>test</em> text</body>
:param prefix: a string to prefix each event with in the output
:param fileobj: the writable file-like object to write to; defaults to
the standard output stream
:rtype: `Transformer`
"""
return self.apply(TraceTransformation(prefix, fileobj=fileobj))
# Internal methods
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def tokens(self, event, next):
kind, data, pos = event
if kind == START:
tag, attrib = data
name = tag.localname
namespace = tag.namespace
if tag in voidElements:
for token in self.emptyTag(namespace, name, list(attrib),
not next or next[0] != END
or next[1] != tag):
yield token
else:
yield self.startTag(namespace, name, list(attrib))
elif kind == END:
name = data.localname
namespace = data.namespace
if name not in voidElements:
yield self.endTag(namespace, name)
elif kind == COMMENT:
yield self.comment(data)
elif kind == TEXT:
for token in self.text(data):
yield token
elif kind == DOCTYPE:
yield self.doctype(*data)
elif kind in (XML_NAMESPACE, DOCTYPE, START_NS, END_NS, \
START_CDATA, END_CDATA, PI):
pass
else:
yield self.unknown(kind)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def to_genshi(walker):
text = []
for token in walker:
type = token["type"]
if type in ("Characters", "SpaceCharacters"):
text.append(token["data"])
elif text:
yield TEXT, "".join(text), (None, -1, -1)
text = []
if type in ("StartTag", "EmptyTag"):
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)