def check_proxy(self, addr):
if not addr.startswith("http://"):
proxy = "http://{0}".format(addr)
else:
proxy = addr
try:
async with aiohttp.ClientSession(loop=self._loop) as session:
with async_timeout.timeout(self._timeout, loop=self._loop):
seed = str(random.randint(0, 99999999))
url = "http://httpbin.org/get?seed={}".format(seed)
async with session.request("GET", url, proxy=proxy) as resp:
body = await resp.read()
data = json.loads(body.decode('utf-8'))
if "args" not in data:
return False
args = data["args"]
if "seed" not in args or args["seed"] != seed:
return False
except Exception:
return False
log.debug("Proxy {} is OK".format(addr))
return True
评论列表
文章目录