def handle(handler_spec, input_events):
r"""
Return a list of event dictionaries collected by handling input_events
using handler_spec.
>>> from abusehelper.core.events import Event
>>> from abusehelper.core.transformation import Handler
>>>
>>> class MyHandler(Handler):
... @idiokit.stream
... def transform(self):
... while True:
... event = yield idiokit.next()
... event.add("a", "b")
... yield idiokit.send(event)
...
>>> handle(MyHandler, [{}])
[{u'a': [u'b']}]
Note that to simplify testing the output is a list of dictionaries
instead of abusehelper.core.events.Event objects.
"""
handler_type = handlers.load_handler(handler_spec)
log = logging.getLogger("Null")
log_handler = _NullHandler()
log.addHandler(log_handler)
try:
handler = handler_type(log=log)
return idiokit.main_loop(idiokit.pipe(
_feed(itertools.imap(events.Event, input_events)),
handler.transform(),
_collect_events()
))
finally:
log.removeHandler(log_handler)
评论列表
文章目录