def save_session(self):
"""Persists the session, e.g. generates Cookie file"""
with open(self.session_file, 'w') as handle:
pickle.dump(
utils.dict_from_cookiejar(self._session.cookies),
handle)
python类dict_from_cookiejar()的实例源码
def process_response(self, request, response, spider):
if response.status == 200:
ok = CookieService.checkWholeCookieFromSetCookies(response.headers.getlist('Set-Cookie'))
if ok is True:
CookieService.readCookiesFromList(response.headers.getlist('Set-Cookie'))
if response.status == 302:
resp = requests.get('http://www.pss-system.gov.cn/sipopublicsearch/patentsearch/tableSearch-showTableSearchIndex.shtml')
CookieService.cookies = dict_from_cookiejar(resp.cookies)
print(CookieService.cookies)
while LoginService(CookieService.cookies).startLogin() is not True:
pass
request.cookies = CookieService.cookies
return self._retry(request, 'unlogin', spider)
return response
def test_app_secure_cookies():
cookies_view.set_secure_cookie('test', '????')
cookies_view.set_secure_cookie('test2', {'value': '????'})
cookies_view.finish(RETCODE.SUCCESS)
cookies_jar = CookieJar()
for k, v in cookies_view.response.cookies.items():
cookies_jar.set_cookie(morsel_to_cookie(v))
cookies_view._request.cookies = dict_from_cookiejar(cookies_jar)
assert cookies_view.get_secure_cookie('test') == '????'
assert cookies_view.get_secure_cookie('test2') == {'value': '????'}
def test_dest_url(ip,is_http,dest_infos,redis=None):
name = dest_infos["name"]
url = dest_infos["url"]
store_cookies = dest_infos["store_cookies"]
use_default_cookies = dest_infos["use_default_cookies"]
pro = {TYPES[is_http]:ip}
time = 0
flag= False
try:
r = None
cookie_old = None
r_cookies_key = "%s:%s" % (name,ip)
if store_cookies and redis != None:
cookie_old = redis.get(r_cookies_key)
#print "old cookie:",cookie
if cookie_old != None and cookie_old != "None" and cookie_old != "{}":
#print "use cookie"
log.debug("PID:%d IP:%s use old cookies:%s " % (os.getpid(),ip,cookie_old))
cookies = cookiejar_from_dict(json.loads(cookie_old))
r = requests.get(url,proxies=pro,cookies=cookies,timeout=SOKCET_TIMEOUT)
else:
if use_default_cookies:
rand_cookies = dest_infos["default_cookies"]
log.debug("PID:%d IP:%s use random cookies:%s " % (os.getpid(),ip,str(rand_cookies)))
cookie = cookiejar_from_dict(rand_cookies)
r = requests.get(url,proxies=pro,cookies=cookie,timeout=SOKCET_TIMEOUT)
else:
r = requests.get(url,proxies=pro,timeout=SOKCET_TIMEOUT)
else:
if use_default_cookies:
cookie = cookiejar_from_dict(dest_infos["default_cookies"])
r = requests.get(url,proxies=pro,cookies=cookie,timeout=SOKCET_TIMEOUT)
else:
r = requests.get(url,proxies=pro,timeout=SOKCET_TIMEOUT)
time += r.elapsed.microseconds/1000
log.debug("PID:%d dest url:%s proxy ip:%s result:%d time:%d type:%s" % (os.getpid(),url,ip,r.status_code,time,TYPES[is_http]))
if r.ok:
flag = True
if store_cookies and redis != None:
#print "new cookies:",r.cookies
if r.cookies != None :
cookie = json.dumps(dict_from_cookiejar(r.cookies))
if cookie and cookie != "{}" and cookie_old != cookie:
log.debug("PID:%d IP:%s new cookies:%s old cookies:%s" % (os.getpid(),ip,cookie,cookie_old))
redis.set(r_cookies_key,cookie)
except Exception as e:
log.debug("PID:%d error:%s" % (os.getpid(),e.message))
return flag,time
def test_url(ip,is_http,redis=None):
pro = {TYPES[is_http]:ip}
#if redis == None:
# redis = redis.StrictRedis(REDIS_SERVER,REDIS_PORT,DB_FOR_IP)
time = 0
flag= False
try:
#print "test url:",i,ip,pro
r = None
cookie_old = None
if STORE_COOKIE and redis != None:
cookie_old = redis.get(ip)
#print "old cookie:",cookie
if cookie_old != None and cookie_old != "None" and cookie_old != "{}":
#print "use cookie"
log.debug("PID:%d IP:%s use old cookies:%s " % (os.getpid(),ip,cookie_old))
cookies = cookiejar_from_dict(json.loads(cookie_old))
r = requests.get(TEST_URL,proxies=pro,cookies=cookies,timeout=SOKCET_TIMEOUT)
else:
if USE_DEFAULT_COOKIE:
rand_cookies = {"bid":random_str()}
log.debug("PID:%d IP:%s use random cookies:%s " % (os.getpid(),ip,str(rand_cookies)))
cookie = cookiejar_from_dict(rand_cookies)
r = requests.get(TEST_URL,proxies=pro,cookies=cookie,timeout=SOKCET_TIMEOUT)
else:
r = requests.get(TEST_URL,proxies=pro,timeout=SOKCET_TIMEOUT)
else:
if USE_DEFAULT_COOKIE:
cookie = cookiejar_from_dict({"bid":random_str()})
r = requests.get(TEST_URL,proxies=pro,cookies=cookie,timeout=SOKCET_TIMEOUT)
else:
r = requests.get(TEST_URL,proxies=pro,timeout=SOKCET_TIMEOUT)
time += r.elapsed.microseconds/1000
log.debug("PID:%d Test IP:%s result:%d time:%d type:%s" % (os.getpid(),ip,r.status_code,time,TYPES[is_http]))
if r.ok:
flag = True
if STORE_COOKIE and redis != None:
#print "new cookies:",r.cookies
if r.cookies != None :
cookie = json.dumps(dict_from_cookiejar(r.cookies))
if cookie and cookie != "{}" and cookie_old != cookie:
log.debug("PID:%d IP:%s new cookies:%s old cookies:%s" % (os.getpid(),ip,cookie,cookie_old))
redis.set(ip,cookie)
except Exception as e:
log.debug("PID:%d error:%s" % (os.getpid(),e.message))
return flag,time
def login(usr, pwd):
'''
login to intel map with usr, pwd
'''
request = requests.Session()
_ = request.get(intel_url)
login_url = re.findall(login_re, _.text)[0]
_ = request.get(login_url)
html = bs(_.text, 'lxml')
data = { 'Email' : usr }
for i in html.form.select('input'):
try:
if i['name'] == 'Page':
data.update({'Page': i['value']})
elif i['name'] == 'service':
data.update({'service': i['value']})
elif i['name'] == 'ltmpl':
data.update({'ltmpl': i['value']})
elif i['name'] == 'continue':
data.update({'continue': i['value']})
elif i['name'] == 'gxf':
data.update({'gxf': i['value']})
elif i['name'] == 'GALX':
data.update({'GALX': i['value']})
elif i['name'] == 'shdf':
data.update({'shdf': i['value']})
elif i['name'] == '_utf8':
data.update({'_utf8': i['value']})
elif i['name'] == 'bgresponse':
data.update({'bgresponse': i['value']})
else:
pass
except KeyError:
pass
_ = request.post(user_xhr_url, data=data, headers=google_headers)
data.update({'Page': 'PasswordSeparationSignIn'})
data.update({'identifiertoken': ''})
data.update({'identifiertoken_audio': ''})
data.update({'identifier-captcha-input': ''})
data.update({'Passwd': pwd})
data.update({'PersistentCookie': 'yes'})
_ = request.post(password_url, data=data, headers=google_headers)
SACSID = dict_from_cookiejar(request.cookies)['SACSID']
ingress_headers.update({ 'cookie': 'SACSID={}'.format(SACSID) })
_ = request.get(intel_url, headers=ingress_headers)
csrftoken = dict_from_cookiejar(request.cookies)['csrftoken']
cookie = 'SACSID={};csrftoken={}'.format(SACSID, csrftoken)
cookie = '{};ingress.intelmap.shflt=viz'.format(cookie)
cookie = '{};ingress.intelmap.lat=0'.format(cookie)
cookie = '{};ingress.intelmap.lng=0'.format(cookie)
cookie = '{};ingress.intelmap.zoom=16'.format(cookie)
return cookie