test_executions.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:qinling 作者: openstack 项目源码 文件源码
def test_execution_concurrency_no_scale(self):
        self._create_function(name='test_python_sleep.py')

        def _create_execution():
            resp, body = self.client.create_execution(self.function_id)
            return resp, body

        futs = []
        with futurist.ThreadPoolExecutor(max_workers=10) as executor:
            for _ in range(3):
                fut = executor.submit(_create_execution)
                futs.append(fut)
            for f in futures.as_completed(futs):
                # Wait until we get the response
                resp, body = f.result()

                self.assertEqual(201, resp.status)
                self.addCleanup(self.client.delete_resource, 'executions',
                                body['id'], ignore_notfound=True)
                self.assertEqual('success', body['status'])

        resp, body = self.admin_client.get_function_workers(self.function_id)

        self.assertEqual(200, resp.status)
        self.assertEqual(1, len(body['workers']))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号