def get_session(domain_or_url):
"""
???????? keep-alive ?session
:param domain_or_url: ??
:type domain_or_url: str
:rtype: requests.Session
"""
domain = urllib_parse.urlsplit(domain_or_url).netloc or domain_or_url
if domain not in pool:
pool[domain] = []
if not hasattr(locked_session, "sessdicts"):
# ????????????????session
# ???session???????, ?? pool ????, ??????????
# ??????, ???? release_lock() ???????session
# ??????session?????session?
locked_session.sessdicts = []
if not pool[domain]:
# ????, ???? session
sessdict = {
"domain": domain,
"sessobj": requests.Session(),
}
else:
# ????????????
sessdict = pool[domain].pop()
sessdict["active"] = time.time()
locked_session.sessdicts.append(sessdict)
if _gc_checkpoint < time.time() - SESSION_TTL:
with cleaning_lock:
clear()
return sessdict["sessobj"] # type: requests.Session
评论列表
文章目录