def getListProxies(self):
session = requests.session()
self.getRandomUserAgent()
page = session.get("http://www.xicidaili.com/nn", headers=self.headers)
soup = BeautifulSoup(page.text, 'lxml')
proxyList = []
taglist = soup.find_all('tr', attrs={'class': re.compile("(odd)|()")})
for trtag in taglist:
tdlist = trtag.find_all('td')
proxy = { 'https': tdlist[1].string + ':' + tdlist[2].string }
proxyList.append(proxy)
return proxyList
#????????????????????
python类Session()的实例源码
def __init__(self, key=None, code=None, user_agent='', cache=None):
"""
This class is the base for access the XML API. Attributes
can be called on it to build a path to an endpoint.
Args:
key (str) - optional authentication keyID
code (str) - optional authentication vCode
user_agent (str) - optional (recommended) User-Agent
to use when making web requests
cache (preston.xmlapi.cache.Cache) - page cache
Returns:
None
"""
self.cache = cache or Cache()
self.user_agent = user_agent or 'Preston (github.com/Celeo/Preston)'
self.session = requests.session()
self.session.headers.update({'User-Agent': self.user_agent})
self.key = key
self.code = code
def __init__(self, api_key, api_secret, redirect_uri, token=None):
# const define
self.site = 'https://api.weibo.com/'
self.authorization_url = self.site + 'oauth2/authorize'
self.token_url = self.site + 'oauth2/access_token'
self.api_url = self.site + '2/'
# init basic info
self.client_id = api_key
self.client_secret = api_secret
self.redirect_uri = redirect_uri
self.session = requests.session()
# activate client directly if given token
if token:
self.set_token(token)
def __init__(self, api_key, api_secret, redirect_uri, token=None):
# const define
self.site = 'https://api.weibo.com/'
self.authorization_url = self.site + 'oauth2/authorize'
self.token_url = self.site + 'oauth2/access_token'
self.api_url = self.site + '2/'
# init basic info
self.client_id = api_key
self.client_secret = api_secret
self.redirect_uri = redirect_uri
self.session = requests.session()
# activate client directly if given token
if token:
self.set_token(token)
def __init__(self, api_key, api_secret, redirect_uri, token=None):
# const define
self.site = 'https://api.weibo.com/'
self.authorization_url = self.site + 'oauth2/authorize'
self.token_url = self.site + 'oauth2/access_token'
self.api_url = self.site + '2/'
# init basic info
self.client_id = api_key
self.client_secret = api_secret
self.redirect_uri = redirect_uri
self.session = requests.session()
# activate client directly if given token
if token:
self.set_token(token)
def get(self, uri, **kwargs):
"""
Request resource by get method.
"""
# 500 test url
# self.api_url='https://httpbin.org/status/500'
url = "{0}{1}.json".format(self.api_url, uri)
res = self.session.get(url, params=kwargs)
# other error code with server will be deal in low level app
# 403 for invalid access token and rate limit
# 400 for information of expire token
if res.status_code in [200, 400, 403]:
self._assert_error(res.json())
return res
def create_session_proxy(self, proxy_info):
# ??????
ip = proxy_info['ip']
port = proxy_info['port']
protocol = proxy_info['protocol'].lower()
proxy = {protocol: ip + ':' + port}
# ??session
session = requests.session()
session.headers = requestHeader
session.cookies.update(self.account_manager.get_auth_token())
session.proxies = proxy
# ? session ?????
self.session_pool.put(session)
self.created_session_num_change(1)
self.available_session_num_change(1)
# ????????? session
def fetch_proxy_data(self, page):
# ??????
self.session.headers = requestHeader
# ???? URL
url = requestUrl + str(page)
# ????
retry_time = 0
while retry_time < NETWORK_RETRY_TIMES:
try:
response = self.session.get(url, timeout=CONNECT_TIMEOUT)
return response.text
except Exception:
# ????
# print("[????]?????????????")
retry_time += 1
time.sleep(NETWORK_RECONNECT_INTERVAL)
# ???????? None
return None
def __init__(self, url, name=None, requests_session=None, timeout=30):
IOBase.__init__(self)
self.url = url
self.sess = requests_session if requests_session is not None else requests.session()
self._seekable = False
self.timeout = timeout
f = self.sess.head(url, headers={'Range': 'bytes=0-'}, timeout=timeout)
if f.status_code == 206 and 'Content-Range' in f.headers:
self._seekable = True
self.len = int(f.headers["Content-Length"])
if name is None:
if "Content-Disposition" in f.headers:
value, params = cgi.parse_header(f.headers["Content-Disposition"])
if "filename" in params:
self.name = params["filename"]
else:
self.name = name
f.close()
self._pos = 0
self._r = None
def extract_news(news_url):
# Fetch html
session_requests = requests.session()
response = session_requests.get(news_url, headers=getHeaders())
news = {}
try:
# Parse html
tree = html.fromstring(response.content)
# Extract information
news = tree.xpath(GET_CNN_NEWS_XPATH)
news = ''.join(news)
except Exception as e:
print # coding=utf-8
return {}
return news
def logout(self):
'''
????
:returns: True????False????????
'''
#passport_logout_response = self.get_response(logout_url)
self.session.cookies = cookielib.CookieJar()
response = self.get_response(logout_url)
check_logout = re.findall('????', response)
if len(check_logout) > 0:
self.logined = False
self.remove_cookies()
return True
else:
return False
def save_cookies(self):
'''
??cookie
:returns: True????False????????
'''
try:
if os.path.isfile('cookie.list'):
os.remove('cookie.list')
fd = open('cookie.list', "wb+")
cookie_list = requests.utils.dict_from_cookiejar(self.session.cookies)
fd.write(json.dumps(cookie_list))
fd.close()
return True
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('???Can\'t save cookie list file.')
return False
def get_all_pic_url(question_num, answer_offset,answer_limit):
url = 'https://www.zhihu.com/api/v4/questions/{qnum}/answers?include=data%5B*%5D.is_normal%2Cis_collapsed%2Cannotation_action%' \
'2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%' \
'2Cmark_infos%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B*%5D.author.follower_count%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&' \
'offset={offset}&limit={limit}&sort_by=default'
response = session.get(url.format(qnum=question_num,offset=answer_offset,limit=answer_limit), headers=headers, allow_redirects=False)
print('json_response', response)
json_response = response.json()
answer = json_response['data']
pattern = re.compile(r'data-original=\"https\:(.*?)\.(jpg|png)"')
urls = [];
for i in range(0, len(answer)):
per_answer_dict = answer[i] # dict
per_answer_content_str = per_answer_dict['content']
match = pattern.findall(per_answer_content_str)
urls.extend(["https:" + i[0] + ".jpg" for i in match[1::2]])
return urls
def login(self, username, password, require_ownership=False):
resp = self.session.post(
self.login_url,
params=dict(require_game_ownership=int(require_ownership)),
data=dict(username=username, password=password)
)
try:
json = resp.json()
except:
json = None
try:
resp.raise_for_status()
return json[0]
except requests.HTTPError:
if isinstance(json, dict) and 'message' in json:
if json['message'] == 'Insufficient membership':
raise OwnershipError(json['message'])
else:
raise AuthError(json['message'])
else:
raise
def login(self, username, password):
self.username = username
self.password = password
data = "password=%s&username=%s" % (self.password, self.username)
res = self.session.post(self.endpoints['login'], data=data)
res = res.json()
try:
self.auth_token = res['token']
except KeyError:
return False
self.headers['Authorization'] = 'Token '+self.auth_token
return True
##############################
#GET DATA
##############################
def run(self):
try:
url = 'http://%s/Home/Save' % (self.url)
data = { 'u' : self.username, 'p' : self.password }
print(url)
print(data)
s = requests.session()
r = s.post(url, data, verify=False)
print(r.status_code, r.content )
print('-----------------------------------------------------')
except Exception as e:
print(e)
pass
#self.parent and self.parent.on_thread_finished(self, 'done')
# top-level domains
def __init__(self, constants, util, settings):
"""
Injects instances, sets session file & loads initial session
:param constants: Constants instance
:type constants: resources.lib.Constants
:param util: Utils instance
:type util: resources.lib.Utils
:param settings: Settings instance
:type settings: resources.lib.Settings
"""
self.constants = constants
self.utils = util
self.settings = settings
addon = self.utils.get_addon()
verify_ssl = True if addon.getSetting('verifyssl') == 'True' else False
self.session_file = self.utils.get_addon_data().get('cookie_path')
self.verify_ssl = verify_ssl
self._session = self.load_session()
def load_session(self):
"""
Generates the build up session object,
loads & deserializes Cookie file if exists
:returns: requests.session -- Session object
"""
_session = session()
_session.headers.update({
'User-Agent': self.utils.get_user_agent(),
'Accept-Encoding': 'gzip'
})
if path.isfile(self.session_file):
with open(self.session_file, 'r') as handle:
try:
_cookies = utils.cookiejar_from_dict(pickle.load(handle))
except EOFError:
_cookies = utils.cookiejar_from_dict({})
_session.cookies = _cookies
return _session
def getCoupon():
sched_Timer="2017-06-14 20:00" ##???????
##????????url ?????Network??
couPonUrl="https://api.m.jd.com/client.action?functionId=newBabelAwardCollection&body=%7B%22activityId%22%3A%223tPzkSJZdNRuhgmowhPn7917dcq1%22%2C%22scene%22%3A%221%22%2C%22args%22%3A%22key%3D898c3948b1a44f36b032c8619e2514eb%2CroleId%3D6983488%2Cto%3Dpro.m.jd.com%2Fmall%2Factive%2F3tPzkSJZdNRuhgmowhPn7917dcq1%2Findex.html%22%2C%22mitemAddrId%22%3A%22%22%2C%22geo%22%3A%7B%22lng%22%3A%22%22%2C%22lat%22%3A%22%22%7D%7D&client=wh5&clientVersion=1.0.0&sid=dce17971eb6cbfcc2275dded296bcb58&uuid=1506710045&area=&_=1497422307569&callback=jsonp5"
##????????referer ?????Network??
referer="https://pro.m.jd.com/mall/active/3tPzkSJZdNRuhgmowhPn7917dcq1/index.html"
while(1):
now=datetime.datetime.now().strftime('%Y-%m-%d %H:%M');
if now==sched_Timer:
cj = requests.utils.cookiejar_from_dict(get_cookie())
session.cookies = cj
resp=session.get(
url=couPonUrl,
headers={
'Referer':referer ,
}
)
logger.info(resp.text)
break
def getCaptcha(self):
captchaImgUrl = 'https://passport.lagou.com/vcode/create?from=register&refresh=%s' % time.time()
# ???????
f = open(CaptchaImagePath, 'wb')
f.write(self.session.get(captchaImgUrl, headers=HEADERS).content)
f.close()
# ???????
if sys.platform.find('darwin') >= 0:
subprocess.call(['open', CaptchaImagePath])
elif sys.platform.find('linux') >= 0:
subprocess.call(['xdg-open', CaptchaImagePath])
else:
os.startfile(CaptchaImagePath)
# ???????
captcha = input("???????(% s)????: " % CaptchaImagePath)
print('????????:% s' % captcha)
return captcha
def get_vdcode():
t = str(int(time.time()*1000))
captcha_url = 'https://passport.bilibili.com/captcha.gif?r=' + t + "&type=login"
r = session.get(captcha_url)
with open('captcha.jpg', 'wb') as f:
f.write(r.content)
f.close()
# ?pillow ? Image ?????
# ?????? pillow ?????????????????????
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
print(u'?? %s ????captcha.jpg ????' % os.path.abspath('captcha.jpg'))
captcha = input("please input the captcha\n>")
return captcha
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
"""The constructor.
Args:
timeout (float): The default global timeout(seconds).
"""
self.timeout = timeout
self.session = requests.session()
if max_retries and retry_interval:
retries = Retry(total=max_retries, backoff_factor=retry_interval)
self.session.mount('http://', HTTPAdapter(max_retries=retries))
self.session.mount('https://', HTTPAdapter(max_retries=retries))
if cache:
self.session = CacheControl(self.session)
def execute_as_string(self, request):
"""Execute a given HttpRequest to get a string response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = self.session.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
timeout=self.timeout)
return self.convert_response(response, False)
def execute_as_binary(self, request):
"""Execute a given HttpRequest to get a binary response back
Args:
request (HttpRequest): The given HttpRequest to execute.
Returns:
HttpResponse: The response of the HttpRequest.
"""
response = self.session.request(HttpMethodEnum.to_string(request.http_method),
request.query_url,
headers=request.headers,
params=request.query_parameters,
data=request.parameters,
files=request.files,
timeout=self.timeout)
return self.convert_response(response, True)
def is_login():
#???????????????????????
inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
response = session.get(inbox_url, headers=header, allow_redirects=False)
if response.status_code != 200:
return False
else:
return True
def get_xsrf():
#??xsrf code
response = session.get("https://www.zhihu.com", headers=header)
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
return xsrf
def get_index():
response = session.get("https://www.zhihu.com", headers=header)
with open("index_page.html", "wb") as f:
f.write(response.text.encode("utf-8"))
print ("ok")
def zhihu_login(account, password):
#????
if re.match("^1\d{10}",account):
print ("??????")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#??????????
print("??????")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
}
response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save()
# get_index()
# is_login()
# get_captcha()
def get_http_client():
if config['use_tor_proxy']:
session = requesocks.session()
session.proxies = {'http': 'socks5://127.0.0.1:%d' % config['tor_proxy_port'],
'https': 'socks5://127.0.0.1:%d' % config['tor_proxy_port']}
return session
else:
return requests.session()
# ??get??
def __init__(self, username=None, password=None,
auto_login=False, get_devices=False, get_automations=False):
"""Init Abode object."""
self._username = username
self._password = password
self._session = None
self._token = None
self._panel = None
self._user = None
self._event_controller = AbodeEventController(self)
self._default_alarm_mode = CONST.MODE_AWAY
self._devices = None
self._automations = None
# Create a requests session to persist the cookies
self._session = requests.session()
if (self._username is not None and
self._password is not None and
auto_login):
self.login()
if get_devices:
self.get_devices()
if get_automations:
self.get_automations()