def send(self, *a, **kw):
a[0].url = a[0].url.replace(urllib.quote("<"), "<")
a[0].url = a[0].url.replace(urllib.quote(" "), " ")
a[0].url = a[0].url.replace(urllib.quote(">"), ">")
return requests.Session.send(self, *a, **kw)
python类Session()的实例源码
def index():
try:
api_key = os.environ['DASHBOARD_DUTY_KEY']
service_key = os.environ['DASHBOARD_DUTY_SERVICE']
except KeyError:
logging.error('Missing Environment Variable(s)')
exit(1)
session = requests.Session()
d_session = dashboard_duty.Core(session, api_key, service_key)
service = d_session.service()
incidents = d_session.incident(service['id'])
if service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'schedule_reference':
service_id = service['escalation_policy']['escalation_rules'][0]['targets'][0]['id']
oncall = d_session.oncall_schedule_policy(service_id)
elif service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'user_reference':
username = service['escalation_policy']['escalation_rules'][0]['targets'][0]['summary']
oncall = d_session.oncall_user_policy(username)
else:
logging.error('Unable to handle oncall policy for %s' % service_key)
exit(1)
return render_template('index.html', service=service, incidents=incidents, oncall=oncall)
def list_DL(List):
List=List[1:]
List2=[]
ID_list=[]
List_dl=[]
k=0
j=0
p=0
while p*2<len(List):
List2.append(List[p*2])
p+=1
for i in List2:
#--------------------
s1=requests.Session()
s1.get(url0)
url_list=s1.get(List2[j], headers=headers)
#--------------------
#a1=requests.get(List2[j])
#html=a1.text
html=url_list.text
a2=html.find('<a href="javascript:void(0)" link="')
a2_len=len('<a href="javascript:void(0)" link="')
if a2<0:
a2=html.find("RJ.currentMP3Url = 'mp3/")
a2_len=len("RJ.currentMP3Url = 'mp3/")
a3=html.find('" target="_blank" class="mp3_download_link">')
if a3<0:
a3=html.find("RJ.currentMP3 = '")
a4=html[a2+a2_len:a3]
if a4.find("'")>0:
a4=a4[:a4.find("'")]
ID_list.append(a4+'.mp3')
while k < len(ID_list):
List_dl.append(check_host(l_mp3[:4],ID_list[k])[0])
k+=1
else:
List_dl.append(a4)
j+=1
return(List_dl)
def __init__(self, service_urls=None, user_agent=DEFAULT_USER_AGENT):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': user_agent,
})
self.service_urls = service_urls or ['translate.google.com']
self.token_acquirer = TokenAcquirer(session=self.session, host=self.service_urls[0])
# Use HTTP2 Adapter if hyper is installed
try: # pragma: nocover
from hyper.contrib import HTTP20Adapter
self.session.mount(urls.BASE, HTTP20Adapter())
except ImportError: # pragma: nocover
pass
def __init__(self, tkk='0', session=None, host='translate.google.com'):
self.session = session or requests.Session()
self.tkk = tkk
self.host = host if 'http' in host else 'https://' + host
def AkamaiEdgeGridSession_Setup(AkamaiEdgeGridConfig):
session = requests.Session()
session.auth = EdgeGridAuth(
client_token=AkamaiEdgeGridConfig['client_token'],
client_secret=AkamaiEdgeGridConfig['client_secret'],
access_token=AkamaiEdgeGridConfig['access_token']
)
return session
#Actually call akamai and return the result.
def __init__(self, host="127.0.0.1", port=46657):
# Tendermint endpoint
self.uri = "http://{}:{}".format(host, port)
# Keep a session
self.session = requests.Session()
# Request counter for json-rpc
self.request_counter = itertools.count()
# request headers
self.headers = {
'user-agent': AGENT,
'Content-Type': 'application/json'
}
def __init__(self, api_key=None, endpoint=None):
if api_key is None or endpoint is None:
try:
from pymatgen import SETTINGS
except ImportError:
warnings.warn('MPResterBase: not using pymatgen SETTINGS!')
SETTINGS = {}
if api_key is not None:
self.api_key = api_key
else:
self.api_key = SETTINGS.get("PMG_MAPI_KEY", "")
if endpoint is not None:
self.preamble = endpoint
else:
self.preamble = SETTINGS.get(
"PMG_MAPI_ENDPOINT", "https://www.materialsproject.org/rest/v2"
)
if not self.api_key:
raise ValueError('API key not set. Run `pmg config --add PMG_MAPI_KEY <USER_API_KEY>`.')
self.session = requests.Session()
self.session.headers = {"x-api-key": self.api_key}
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
super().__init__()
self._endpoint = broker
self._encoding = "utf-8"
if enc_key == None or enc_iv == None:
self._transport_enc = False
self._transport_enc_key = None
self._transport_enc_iv = None
self._cipher = None
else:
self._transport_enc = True
self._transport_enc_key = enc_key
self._transport_enc_iv = enc_iv
backend = default_backend()
self._cipher = Cipher(algorithms.AES(
enc_key), modes.CBC(enc_iv), backend=backend)
self._session = requests.Session()
self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
'query_data': self.on_query_data, 'send_order': self.on_insert_order,
'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
self.client_id = ''
self.account_id = ''
def test_hearthhead(self):
with requests.Session() as s:
self.assertEqual(scrape.getHearthHeadId('Quick Shot', 'Spell', s),
'quick-shot')
self.assertEqual(scrape.getHearthHeadId('Undercity Valiant',
'Minion', s), 'undercity-valiant')
self.assertEqual(scrape.getHearthHeadId('Gorehowl', 'Weapon', s),
'gorehowl')
self.assertEqual(scrape.getHearthHeadId('V-07-TR-0N',
'Minion', s), 'v-07-tr-0n')
self.assertEqual(scrape.getHearthHeadId("Al'Akir the Windlord",
'Minion', s), 'alakir-the-windlord')
def test_Hearthpwn(self):
with requests.Session() as s:
self.assertEqual(scrape.getHearthpwnIdAndUrl('Quick Shot',
'Blackrock Mountain', 'Spell', False, s),
(14459, 'https://media-hearth.cursecdn.com/avatars/328/302/14459.png'))
self.assertEqual(scrape.getHearthpwnIdAndUrl('Upgrade!',
'Classic', 'Spell', False, s),
(638, 'https://media-hearth.cursecdn.com/avatars/330/899/638.png'))
def loadTokens(tokens = {}, wantedTokens = {}):
resultCards = {}
with requests.Session() as session:
for name, ids in wantedTokens.items():
card = None
if 'id' in ids:
card = tokens[ids['id']]
if name != card['name']:
log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name'])
if 'id' not in ids:
for token in tokens.values():
if name == token['name']:
if card:
log.warning('loadTokens() found token again: %s', name)
card = token
if not card:
log.warning('loadTokens() could not find: %s', name)
exit()
r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn']))
r.raise_for_status()
image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src')
if not image:
image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png'
card['cdn'] = image.replace('http://', 'https://').lower()
card['hpwn'] = ids['hpwn']
card['head'] = getHearthHeadId(card['name'], "ignored", "ignored")
# since jade golem: overwrite scraped stats with prepared ones
card['atk'] = ids.get('atk', card['atk'])
card['cost'] = ids.get('cost', card['cost'])
card['hp'] = ids.get('hp', card['hp'])
resultCards[card['name']] = card
print('.', end='')
return resultCards
def download_file_from_google_drive(id, destination):
URL = "https://docs.google.com/uc?export=download"
session = requests.Session()
response = session.get(URL, params={ 'id': id }, stream=True)
token = get_confirm_token(response)
if token:
params = { 'id' : id, 'confirm' : token }
response = session.get(URL, params=params, stream=True)
save_response_content(response, destination)
def __init__(self, config):
self.dbFilename = "downloads.db"
self.showsFilename = "SHOWS"
self.sourceIP = ""
self.transmissionHostRemote = ""
self.userpass = ""
self.downloadQuality = [720, 1080]
self.speakDownload = True
# We don't retain 'proxs' or 'requestsFromSource' in this
# instance since they get stored/retained inside TorrentApiController
proxs = {}
requestsFromSource = requests.Session()
self.establishConfiguration(config, requestsFromSource, proxs)
self.torrentController = TorrentApiController(requestsFromSource, proxs)
self.establishDatabase()
def _reset_session(self):
''' Get a new request session and try logging into the current
CVP node. If the login succeeded None will be returned and
self.session will be valid. If the login failed then an
exception error will be returned and self.session will
be set to None.
'''
self.session = requests.Session()
error = None
try:
self._login()
except (ConnectionError, CvpApiError, CvpRequestError,
CvpSessionLogOutError, HTTPError, ReadTimeout, Timeout,
TooManyRedirects) as error:
self.log.error(error)
# Any error that occurs during login is a good reason not to use
# this CVP node.
self.session = None
return error
def getSession(self):
if self._session is None:
self._session = requests.Session()
return self._session
def __init__(self, user_mode):
self.user_mode = user_mode
self.interface = None
self.quota = None
self.is_online = None
self.new_api = None
self.json_decoder = json.JSONDecoder()
self.session = requests.Session()
self.csrf_token = None
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = ['172.16.0.1']
self.api_host_ip = self.resolve_url(self.api_host)
self.api_host_new_ip = self.resolve_url(self.api_host_new)
def gethtml(link):
user_agent = "Mozilla/5.0 (Windows NT 6.1; rv:38.0) Gecko/20100101 Firefox/38.0"
headers={'user-agent':user_agent}
s = requests.Session()
try:
res = s.get(link,headers=headers).text
except requests.exceptions.InvalidSchema:
req=urllib2.Request(link,None,headers)
r = urllib2.urlopen(req)
res = r.read().decode('utf8')
return res
def main():
# Start a session so we can have persistant cookies
# Session() >> http://docs.python-requests.org/en/latest/api/#request-sessions
session = requests.Session()
# This is the form data that the page sends when logging in
login_data = {
'username': RegisterNumber,
'password': DateofBirth,
'submit': 'id',
}
print login_data
# Authenticate
r = session.post(URL, data = login_data)
# Try accessing a page that requires you to be logged in
r = session.get('http://sdpdr.nic.in/annauniv/result')
web = QWebView()
web.load(QUrl("http://sdpdr.nic.in/annauniv/result"))
#web.show()
printer = QPrinter()
printer.setPageSize(QPrinter.A4)
printer.setOutputFormat(QPrinter.PdfFormat)
printer.setOutputFileName("result.pdf")
# convertion of page to pdf format
imagenet_baidu_image_search.py 文件源码
项目:imageDownloader
作者: whcacademy
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def __init__(self, word, dirpath=None, processNum=30):
#if " " in word:
#raise AttributeError("This Script Only Support Single Keyword!")
self.word = word
self.char_table = {ord(key): ord(value)
for key, value in BaiduImgDownloader.char_table.items()}
if not dirpath:
dirpath = os.path.join(sys.path[0], 'results')
self.dirpath = dirpath
self.jsonUrlFile = os.path.join(sys.path[0], 'jsonUrl.txt')
self.logFile = os.path.join(sys.path[0], 'logInfo.txt')
self.errorFile = os.path.join(sys.path[0], 'errorUrl.txt')
if os.path.exists(self.errorFile):
os.remove(self.errorFile)
if not os.path.exists(self.dirpath):
os.mkdir(self.dirpath)
self.pool = Pool(30)
self.session = requests.Session()
self.session.headers = BaiduImgDownloader.headers
self.queue = Queue()
self.messageQueue = Queue()
self.index = 0
self.promptNum = 10
self.lock = threading.Lock()
self.delay = 1.5
self.QUIT = "QUIT"
self.printPrefix = "**"
self.processNum = processNum
def get_captcha():
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
t = session.get(captcha_url, headers=header)
with open("captcha.jpg","wb") as f:
f.write(t.content)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("?????\n>")
return captcha
def new_session(account):
if account.get('session',None) is None:
session = requests.session()
session.verify = True
session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'})
if not account['proxy'] is None:
session.proxies.update(account['proxy'])
account['session'] = session
else:
account['session'].close()
account['session'].cookies.clear()
account['session_time'] = get_time()
account['session_hash'] = os.urandom(32)
account['api_url'] = API_URL
account['auth_ticket'] = None
def get_upload_url(self,session):
"""Summary
Args:
session (TYPE): Description
Returns:
TYPE: Description
"""
r = session.get ( DataManagement.__APPSPOT_URL )
if r.text.startswith ( '\n<!DOCTYPE html>' ):
self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, '
'refresh the authentication token. If it did not work report a problem. '
'They might have changed something in the Matrix.' )
sys.exit ( 1 )
elif r.text.startswith ( '<HTML>' ):
self.logger.debug ( 'Redirecting to upload URL' )
r = session.get ( DataManagement.__APPSPOT_URL )
d = ast.literal_eval ( r.text )
return d['url']
def __init__(self, cache: bool = False, future: bool = True):
if cache:
redis_conn = redis.StrictRedis(host='redis')
self.session = requests_cache.core.CachedSession(
cache_name='api_cache',
backend='redis', expire_after=60 * 60 * 24 * 30,
allowable_codes=(200,),
allowable_methods=('GET',),
old_data_on_error=False,
connection=redis_conn,
)
else:
self.session = session()
if future:
self.future_session = FuturesSession(max_workers=10, session=self.session)
self.url = self.url_template.format(resource='', token=self.token)
def get_token(self, symbols, wlist):
ip = Helper.get_ip()
url = 'https://current.sina.com.cn/auth/api/jsonp.php/' + \
'var%20KKE_auth_{}=/'.format(Helper.random_string(9)) + \
'AuthSign_Service.getSignCode?' + \
'query=hq_pjb&ip={}&list={}&kick=1'.format(ip, wlist)
resp = self.session.get(url)
pat = re.compile('result:"([^"]+)",timeout:(\d+)')
m = pat.search(resp.text)
if m:
token, timeout = m.groups()
timeout = int(timeout)
return token
else:
log.error('token error: {}'.format(resp.text))
return self.get_token(symbols, wlist)
def login(self, name, password):
"""Authenticates user.
:param str name: account name
:param str password: account password
.. note::
Throws :class:`exceptions.RequestFailed` (20, 'badCreadentials') if bad credentials are passed.
"""
self.session = requests.session()
remember = True
data = self._request('login', [name, password, int(remember)], hmethod='POST')
return LoggedUser(fw=self, uid=data[3], name=data[0], img=common.img_path_to_relative(data[1]), sex=data[4], birth_date=data[5])
# This method is one big TODO.
def first_get(self):
global _session
_session=requests.session()
main_url='https://www.instagram.com'
_session.get(main_url,proxies=self.use_proxy,verify=True)
self.save_cookies()
if os.path.exists('cookiefile'):#print('have cookies')
self.csrf=self.read_cookies()
self.data=self.create_ajax()
print(self.data)
self.ins()
time.sleep(5)#wait for 5 seconds
self.my_selfie=get_pic.get_pic()#???——??-??
self.my_selfie.get_selfie()#download random selfie picture to local folder
self.upload()#upload the selfie
else:
pass
imagetypersapi.py 文件源码
项目:Imagetyperz-Python-API-examples
作者: zphanjakidze
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def __init__(self, username, password, timeout = 120, ref_id = 0):
self._username = username
self._password = password
self._ref_id = '{}'.format(ref_id) # save as str
self._timeout = timeout
self._session = session() # init a new session
self._normal_captcha = None # save last solved captcha
self._recaptcha = None
self._error = None # keep track of last error
self._headers = { # use this user agent
'User-Agent' : USER_AGENT
}
# solve normal captcha
def _get_log_schema(self):
"""
Get the log schema for this SMC version.
:return: dict
"""
if self.session and self.session_id:
schema = '{}/{}/monitoring/log/schemas'.format(self.url, self.api_version)
response = self.session.get(
url=schema,
headers={'cookie': self.session_id,
'content-type': 'application/json'})
if response.status_code in (200, 201):
return response.json()
def available_api_versions(base_url, timeout=10, verify=True):
"""
Get all available API versions for this SMC
:return version numbers
:rtype: list
"""
try:
r = requests.get('%s/api' % base_url, timeout=timeout,
verify=verify) # no session required
if r.status_code == 200:
j = json.loads(r.text)
versions = []
for version in j['version']:
versions.append(version['rel'])
#versions = [float(i) for i in versions]
return versions
raise SMCConnectionError(
'Invalid status received while getting entry points from SMC. '
'Status code received %s. Reason: %s' % (r.status_code, r.reason))
except requests.exceptions.RequestException as e:
raise SMCConnectionError(e)