def _sow(self, watch, pattern, since, handler, impl):
"""Publish state of the world."""
if since is None:
since = 0
def _publish(item):
when, path, content = item
try:
payload = impl.on_event(str(path), None, content)
if payload is not None:
payload['when'] = when
handler.write_message(payload)
except Exception as err: # pylint: disable=W0703
handler.send_error_msg(str(err))
db_connections = []
fs_records = self._get_fs_sow(watch, pattern, since)
sow = getattr(impl, 'sow', None)
sow_table = getattr(impl, 'sow_table', 'sow')
try:
records = []
if sow:
dbs = sorted(glob.glob(os.path.join(self.root, sow, '*')))
for db in dbs:
if os.path.basename(db).startswith('.'):
continue
conn, db_cursor = self._db_records(
db, sow_table, watch, pattern, since
)
records.append(db_cursor)
# FIXME: Figure out pylint use before assign
db_connections.append(conn) # pylint: disable=E0601
records.append(fs_records)
# Merge db and fs records, removing duplicates.
prev_path = None
for item in heapq.merge(*records):
_when, path, _content = item
if path == prev_path:
continue
prev_path = path
_publish(item)
finally:
for conn in db_connections:
if conn:
conn.close()
评论列表
文章目录