test_node.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:maas 作者: maas 项目源码 文件源码
def test_integrity_error_is_retried(self):
        # This doesn't simulate the actual way the failure is caused, but it
        # does simulate it to cause the same effect.
        #
        # The `calls` below is used to hold the context inside of the retry
        # loop. The first call will fail as `_clear_events` will be called,
        # the first retry will not call `_clear_events` which will allow the
        # transaction to be committed.
        calls = []

        def _clear_events(collector):
            for idx, objs in enumerate(collector.fast_deletes):
                if len(objs) > 0 and isinstance(objs[0], Event):
                    collector.fast_deletes[idx] = Event.objects.none()

        @transactional
        def _in_database():
            node = factory.make_Node()
            for _ in range(10):
                factory.make_Event(node=node)

            # Use the collector directly instead of calling `delete`.
            collector = Collector(using="default")
            collector.collect([node])
            if calls == 0:
                _clear_events(collector)
            calls.append(None)
            collector.delete()

        # Test is that no exception is raised. If this doesn't work then a
        # `django.db.utils.IntegrityError` will be raised.
        yield deferToDatabase(_in_database)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号