def test_upload_journal_logs(self):
''' test upload_journal_logs() '''
tskey = '__REALTIME_TIMESTAMP'
journal = systemd.journal.Reader(path=os.getcwd())
with patch('systemd.journal.Reader', return_value=journal) as journal:
with patch('main.JournaldClient', MagicMock(autospec=True)) as reader:
reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4]
with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT):
log_group1 = Mock()
log_group2 = Mock()
self.client.retain_message.side_effect = [True, False, True, True]
self.client.group_messages.return_value = [
((log_group1, 'stream1'), [sentinel.msg1]),
((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]),
]
self.client.upload_journal_logs(os.getcwd())
# creates reader
reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT)
# uploads log messages
log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1])
log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
评论列表
文章目录