nicodown_async.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:niconico-tools 作者: mo-san 项目源码 文件源码
def _retrieve_info(self, video_id: str) -> Dict:
        interval = self.interval
        backoff = self.backoff
        attempt = max(0, self.retries) + 1
        url = URL.URL_Watch + video_id
        self.logger.debug("_worker: %s", locals())

        async with asyncio.Semaphore(self.__parallel_limit):
            st = 0
            while attempt > 0:
                attempt -= 1
                async with self.session.get(url) as response:  # type: aiohttp.ClientResponse
                    self.logger.debug("Video ID: %s, Status Code: %s", video_id, response.status)
                    if response.status == 200:
                        info_data = await response.text()
                        return self._junction(info_data)
                    # ?????????400?????????
                    elif 400 <= response.status < 500:
                        response.raise_for_status()
                    elif 500 <= response.status < 600:
                        await asyncio.sleep(interval/2)
                        print(Err.waiting_for_permission)
                        await asyncio.sleep(interval/2)
                        interval *= backoff
                        st = response.status
                    else:
                        st = response.status
                        break
            raise aiohttp.errors.HttpProcessingError(
                code=st, message=Err.connection_timeout.format(video_id))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号