def post(self, _data=None, _headers=None, **kwargs):
self.count += 1
if _data is not None:
with patch.object(oauth.aiohttp.payload, 'BytesPayload',
side_effect=dummy_func):
assert _data._gen_form_urlencoded() == b"access_token=abc"
if _headers is not None:
key = "1234567890:0987654321"
auth = base64.b64encode(key.encode('utf-8')).decode('utf-8')
assert _headers['Authorization'] == 'Basic ' + auth
# This is needed to run `test_oauth2_concurrent_refreshes`
# without that, refresh tasks would be executed sequentially
# In a sense it is a simulation of a request being fetched
await asyncio.sleep(0.001)
return {'access_token': "abc"}
评论列表
文章目录