test_cluster.py 文件源码

python
阅读 38 收藏 0 点赞 0 评论 0

项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码
def test_reap_dead_node(self):
        node = copy.deepcopy(self.dummy_node)
        TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
        instance = TestInstance(datetime.now(pytz.utc))

        ready_condition = None
        for condition in node['status']['conditions']:
            if condition['type'] == 'Ready':
                ready_condition = condition
                break
        ready_condition['status'] = 'Unknown'

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(minutes=30))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_not_called()

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_called_once_with()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号