test_storage.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:gnocchi 作者: gnocchixyz 项目源码 文件源码
def test_add_measures_update_subset_split(self):
        m, m_sql = self._create_metric('medium')
        measures = [
            incoming.Measure(datetime64(2014, 1, 6, i, j, 0), 100)
            for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
        self.incoming.add_measures(m.id, measures)
        self.trigger_processing([str(m.id)])

        # add measure to end, in same aggregate time as last point.
        self.incoming.add_measures(m.id, [
            incoming.Measure(datetime64(2014, 1, 6, 1, 58, 1), 100)])

        with mock.patch.object(self.storage, '_store_metric_measures') as c:
            # should only resample last aggregate
            self.trigger_processing([str(m.id)])
        count = 0
        for call in c.mock_calls:
            # policy is 60 points and split is 48. should only update 2nd half
            args = call[1]
            if (args[0] == m_sql
               and args[2] == 'mean'
               and args[1].sampling == numpy.timedelta64(1, 'm')):
                count += 1
        self.assertEqual(1, count)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号