test_aws.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:aws-lambda-fsm-workflows 作者: Workiva 项目源码 文件源码
def test_aquire_lease_redis_leased_expired_lose(self,
                                                    mock_settings,
                                                    mock_time,
                                                    mock_get_primary_cache_source,
                                                    mock_get_connection):
        mock_settings.ENDPOINTS = {}
        mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
        mock_time.time.return_value = 999.
        mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
        mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
        mock_pipe.get.return_value = '99:99:0:99'
        mock_pipe.execute.side_effect = redis.WatchError
        ret = acquire_lease('a', 1, 1)
        self.assertFalse(ret)
        mock_pipe.watch.assert_called_with('lease-a')
        mock_pipe.get.assert_called_with('lease-a')
        mock_pipe.multi.assert_called_with()
        mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:100')
        mock_pipe.execute.assert_called_with()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号