python类Session()的实例源码

lfisuite.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send(self, *a, **kw):
        a[0].url = a[0].url.replace(urllib.quote("<"), "<")
        a[0].url = a[0].url.replace(urllib.quote(" "), " ")
        a[0].url = a[0].url.replace(urllib.quote(">"), ">")
        return requests.Session.send(self, *a, **kw)
app.py 文件源码 项目:dashboard-duty 作者: jrasell 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def index():

    try:
        api_key = os.environ['DASHBOARD_DUTY_KEY']
        service_key = os.environ['DASHBOARD_DUTY_SERVICE']
    except KeyError:
        logging.error('Missing Environment Variable(s)')
        exit(1)

    session = requests.Session()
    d_session = dashboard_duty.Core(session, api_key, service_key)

    service = d_session.service()
    incidents = d_session.incident(service['id'])

    if service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'schedule_reference':
        service_id = service['escalation_policy']['escalation_rules'][0]['targets'][0]['id']
        oncall = d_session.oncall_schedule_policy(service_id)

    elif service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'user_reference':
        username = service['escalation_policy']['escalation_rules'][0]['targets'][0]['summary']
        oncall = d_session.oncall_user_policy(username)
    else:
        logging.error('Unable to handle oncall policy for %s' % service_key)
        exit(1)

    return render_template('index.html', service=service, incidents=incidents, oncall=oncall)
RJ_3.py 文件源码 项目:Radiojavan 作者: nimasaj 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def list_DL(List):
    List=List[1:]
    List2=[]
    ID_list=[]
    List_dl=[]
    k=0
    j=0
    p=0

    while p*2<len(List):
        List2.append(List[p*2])
        p+=1

    for i in List2:
        #--------------------
        s1=requests.Session()
        s1.get(url0)
        url_list=s1.get(List2[j], headers=headers)
        #--------------------
        #a1=requests.get(List2[j])
        #html=a1.text
        html=url_list.text
        a2=html.find('<a href="javascript:void(0)" link="')
        a2_len=len('<a href="javascript:void(0)" link="')
        if a2<0:
            a2=html.find("RJ.currentMP3Url = 'mp3/")
            a2_len=len("RJ.currentMP3Url = 'mp3/")
        a3=html.find('" target="_blank" class="mp3_download_link">')
        if a3<0:
            a3=html.find("RJ.currentMP3 = '")
        a4=html[a2+a2_len:a3]
        if a4.find("'")>0:
            a4=a4[:a4.find("'")]
            ID_list.append(a4+'.mp3')
            while k < len(ID_list):
                List_dl.append(check_host(l_mp3[:4],ID_list[k])[0])
                k+=1
        else:
            List_dl.append(a4)
        j+=1
    return(List_dl)
client.py 文件源码 项目:googletranslate.popclipext 作者: wizyoung 项目源码 文件源码 阅读 88 收藏 0 点赞 0 评论 0
def __init__(self, service_urls=None, user_agent=DEFAULT_USER_AGENT):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': user_agent,
        })
        self.service_urls = service_urls or ['translate.google.com']
        self.token_acquirer = TokenAcquirer(session=self.session, host=self.service_urls[0])

        # Use HTTP2 Adapter if hyper is installed
        try:  # pragma: nocover
            from hyper.contrib import HTTP20Adapter
            self.session.mount(urls.BASE, HTTP20Adapter())
        except ImportError:  # pragma: nocover
            pass
gtoken.py 文件源码 项目:googletranslate.popclipext 作者: wizyoung 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, tkk='0', session=None, host='translate.google.com'):
        self.session = session or requests.Session()
        self.tkk = tkk
        self.host = host if 'http' in host else 'https://' + host
ccu.py 文件源码 项目:ccu_and_eccu_publish 作者: gaofubin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def AkamaiEdgeGridSession_Setup(AkamaiEdgeGridConfig):
    session = requests.Session()

    session.auth = EdgeGridAuth(
                client_token=AkamaiEdgeGridConfig['client_token'],
                client_secret=AkamaiEdgeGridConfig['client_secret'],
                access_token=AkamaiEdgeGridConfig['access_token']
    )

    return session

#Actually call akamai and return the result.
client.py 文件源码 项目:py-tendermint 作者: davebryson 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, host="127.0.0.1", port=46657):
        # Tendermint endpoint
        self.uri = "http://{}:{}".format(host, port)

        # Keep a session
        self.session = requests.Session()

        # Request counter for json-rpc
        self.request_counter = itertools.count()

        # request headers
        self.headers = {
            'user-agent': AGENT,
            'Content-Type': 'application/json'
        }
rester.py 文件源码 项目:webtzite 作者: materialsproject 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, api_key=None, endpoint=None):
        if api_key is None or endpoint is None:
            try:
                from pymatgen import SETTINGS
            except ImportError:
                warnings.warn('MPResterBase: not using pymatgen SETTINGS!')
                SETTINGS = {}
        if api_key is not None:
            self.api_key = api_key
        else:
            self.api_key = SETTINGS.get("PMG_MAPI_KEY", "")
        if endpoint is not None:
            self.preamble = endpoint
        else:
            self.preamble = SETTINGS.get(
                "PMG_MAPI_ENDPOINT", "https://www.materialsproject.org/rest/v2"
            )
        if not self.api_key:
            raise ValueError('API key not set. Run `pmg config --add PMG_MAPI_KEY <USER_API_KEY>`.')
        self.session = requests.Session()
        self.session.headers = {"x-api-key": self.api_key}
__init__.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
        super().__init__()

        self._endpoint = broker
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
        else:
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(
                enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
        self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
                            'query_data': self.on_query_data, 'send_order': self.on_insert_order,
                            'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
        self.client_id = ''
        self.account_id = ''
test.py 文件源码 项目:hearthscan-bot 作者: d-schmidt 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_hearthhead(self):
        with requests.Session() as s:
            self.assertEqual(scrape.getHearthHeadId('Quick Shot', 'Spell', s),
                'quick-shot')
            self.assertEqual(scrape.getHearthHeadId('Undercity Valiant',
                'Minion', s), 'undercity-valiant')
            self.assertEqual(scrape.getHearthHeadId('Gorehowl', 'Weapon', s),
                'gorehowl')
            self.assertEqual(scrape.getHearthHeadId('V-07-TR-0N',
                'Minion', s), 'v-07-tr-0n')
            self.assertEqual(scrape.getHearthHeadId("Al'Akir the Windlord",
                'Minion', s), 'alakir-the-windlord')
test.py 文件源码 项目:hearthscan-bot 作者: d-schmidt 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_Hearthpwn(self):
        with requests.Session() as s:
            self.assertEqual(scrape.getHearthpwnIdAndUrl('Quick Shot',
                    'Blackrock Mountain', 'Spell', False, s),
                    (14459, 'https://media-hearth.cursecdn.com/avatars/328/302/14459.png'))
            self.assertEqual(scrape.getHearthpwnIdAndUrl('Upgrade!',
                    'Classic', 'Spell', False, s),
                    (638, 'https://media-hearth.cursecdn.com/avatars/330/899/638.png'))
scrape.py 文件源码 项目:hearthscan-bot 作者: d-schmidt 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def loadTokens(tokens = {}, wantedTokens = {}):
    resultCards = {}
    with requests.Session() as session:
        for name, ids in wantedTokens.items():
            card = None

            if 'id' in ids:
                card = tokens[ids['id']]
                if name != card['name']:
                    log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name'])

            if 'id' not in ids:
                for token in tokens.values():
                    if name == token['name']:
                        if card:
                            log.warning('loadTokens() found token again: %s', name)
                        card = token

            if not card:
                log.warning('loadTokens() could not find: %s', name)
                exit()

            r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn']))
            r.raise_for_status()
            image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src')
            if not image:
                image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png'

            card['cdn'] = image.replace('http://', 'https://').lower()
            card['hpwn'] = ids['hpwn']
            card['head'] = getHearthHeadId(card['name'], "ignored", "ignored")

            # since jade golem: overwrite scraped stats with prepared ones
            card['atk'] = ids.get('atk', card['atk'])
            card['cost'] = ids.get('cost', card['cost'])
            card['hp'] = ids.get('hp', card['hp'])

            resultCards[card['name']] = card
            print('.', end='')

    return resultCards
download.py 文件源码 项目:CausalGAN 作者: mkocaoglu 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"
    session = requests.Session()

    response = session.get(URL, params={ 'id': id }, stream=True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params=params, stream=True)

    save_response_content(response, destination)
getTV.py 文件源码 项目:getTV 作者: mattsta 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, config):
        self.dbFilename = "downloads.db"
        self.showsFilename = "SHOWS"
        self.sourceIP = ""
        self.transmissionHostRemote = ""
        self.userpass = ""
        self.downloadQuality = [720, 1080]
        self.speakDownload = True

        # We don't retain 'proxs' or 'requestsFromSource' in this
        # instance since they get stored/retained inside TorrentApiController
        proxs = {}
        requestsFromSource = requests.Session()

        self.establishConfiguration(config, requestsFromSource, proxs)
        self.torrentController = TorrentApiController(requestsFromSource, proxs)

        self.establishDatabase()
cvp_client.py 文件源码 项目:cvprac 作者: aristanetworks 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _reset_session(self):
        ''' Get a new request session and try logging into the current
            CVP node. If the login succeeded None will be returned and
            self.session will be valid. If the login failed then an
            exception error will be returned and self.session will
            be set to None.
        '''
        self.session = requests.Session()
        error = None
        try:
            self._login()
        except (ConnectionError, CvpApiError, CvpRequestError,
                CvpSessionLogOutError, HTTPError, ReadTimeout, Timeout,
                TooManyRedirects) as error:
            self.log.error(error)
            # Any error that occurs during login is a good reason not to use
            # this CVP node.
            self.session = None
        return error
sessionfactory.py 文件源码 项目:skymod 作者: DelusionalLogic 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def getSession(self):
        if self._session is None:
            self._session = requests.Session()
        return self._session
manager.py 文件源码 项目:db_wlan_manager 作者: sistason 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, user_mode):
        self.user_mode = user_mode
        self.interface = None
        self.quota = None

        self.is_online = None
        self.new_api = None

        self.json_decoder = json.JSONDecoder()
        self.session = requests.Session()
        self.csrf_token = None

        self.resolver = dns.resolver.Resolver()
        self.resolver.nameservers = ['172.16.0.1']
        self.api_host_ip = self.resolve_url(self.api_host)
        self.api_host_new_ip = self.resolve_url(self.api_host_new)
recipe-579107.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def gethtml(link):
    user_agent = "Mozilla/5.0 (Windows NT 6.1; rv:38.0) Gecko/20100101 Firefox/38.0"
    headers={'user-agent':user_agent}
    s = requests.Session()
    try:
        res = s.get(link,headers=headers).text
    except requests.exceptions.InvalidSchema:
        req=urllib2.Request(link,None,headers)
        r = urllib2.urlopen(req)
        res = r.read().decode('utf8')
    return res
recipe-579081.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main():
    # Start a session so we can have persistant cookies

    # Session() &gt;&gt; http://docs.python-requests.org/en/latest/api/#request-sessions
    session = requests.Session()

    # This is the form data that the page sends when logging in

    login_data = {
    'username': RegisterNumber,
    'password': DateofBirth,
    'submit': 'id',
    }
    print login_data

    # Authenticate
    r = session.post(URL, data = login_data)

    # Try accessing a page that requires you to be logged in
    r = session.get('http://sdpdr.nic.in/annauniv/result')

    web = QWebView()
    web.load(QUrl("http://sdpdr.nic.in/annauniv/result"))
    #web.show()

    printer = QPrinter()
    printer.setPageSize(QPrinter.A4)
    printer.setOutputFormat(QPrinter.PdfFormat)
    printer.setOutputFileName("result.pdf")

# convertion of page to pdf format
imagenet_baidu_image_search.py 文件源码 项目:imageDownloader 作者: whcacademy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, word, dirpath=None, processNum=30):
        #if " " in word:
            #raise AttributeError("This Script Only Support Single Keyword!")
        self.word = word
        self.char_table = {ord(key): ord(value)
                           for key, value in BaiduImgDownloader.char_table.items()}
        if not dirpath:
            dirpath = os.path.join(sys.path[0], 'results')
        self.dirpath = dirpath
        self.jsonUrlFile = os.path.join(sys.path[0], 'jsonUrl.txt')
        self.logFile = os.path.join(sys.path[0], 'logInfo.txt')
        self.errorFile = os.path.join(sys.path[0], 'errorUrl.txt')
        if os.path.exists(self.errorFile):
            os.remove(self.errorFile)
        if not os.path.exists(self.dirpath):
            os.mkdir(self.dirpath)
        self.pool = Pool(30)
        self.session = requests.Session()
        self.session.headers = BaiduImgDownloader.headers
        self.queue = Queue()
        self.messageQueue = Queue()
        self.index = 0
        self.promptNum = 10
        self.lock = threading.Lock()
        self.delay = 1.5
        self.QUIT = "QUIT"
        self.printPrefix = "**"
        self.processNum = processNum
zhihu_login_requests.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 83 收藏 0 点赞 0 评论 0
def get_captcha():
    import time
    t = str(int(time.time()*1000))
    captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
    t = session.get(captcha_url, headers=header)
    with open("captcha.jpg","wb") as f:
        f.write(t.content)
        f.close()

    from PIL import Image
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        pass

    captcha = input("?????\n>")
    return captcha
main0.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def new_session(account):
    if account.get('session',None) is None:
        session = requests.session()

        session.verify = True
        session.headers.update({'User-Agent': 'Niantic App'})  # session.headers.update({'User-Agent': 'niantic'})
        if not account['proxy'] is None:
            session.proxies.update(account['proxy'])
        account['session'] = session
    else:
        account['session'].close()
        account['session'].cookies.clear()

    account['session_time'] = get_time()
    account['session_hash'] = os.urandom(32)

    account['api_url'] = API_URL
    account['auth_ticket'] = None
wpDataManagement.py 文件源码 项目:gee-bridge 作者: francbartoli 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_upload_url(self,session):
        """Summary

        Args:
            session (TYPE): Description

        Returns:
            TYPE: Description
        """
        r = session.get ( DataManagement.__APPSPOT_URL )
        if r.text.startswith ( '\n<!DOCTYPE html>' ):
            self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, '
                                'refresh the authentication token. If it did not work report a problem. '
                                'They might have changed something in the Matrix.' )
            sys.exit ( 1 )
        elif r.text.startswith ( '<HTML>' ):
            self.logger.debug ( 'Redirecting to upload URL' )
            r = session.get ( DataManagement.__APPSPOT_URL )
            d = ast.literal_eval ( r.text )

        return d['url']
api_tools.py 文件源码 项目:bitrader 作者: jr-minnaar 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, cache: bool = False, future: bool = True):
        if cache:
            redis_conn = redis.StrictRedis(host='redis')
            self.session = requests_cache.core.CachedSession(
                cache_name='api_cache',
                backend='redis', expire_after=60 * 60 * 24 * 30,
                allowable_codes=(200,),
                allowable_methods=('GET',),
                old_data_on_error=False,
                connection=redis_conn,
            )
        else:
            self.session = session()
        if future:
            self.future_session = FuturesSession(max_workers=10, session=self.session)
        self.url = self.url_template.format(resource='', token=self.token)
sinal2.py 文件源码 项目:sinal2 作者: observerss 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_token(self, symbols, wlist):
        ip = Helper.get_ip()
        url = 'https://current.sina.com.cn/auth/api/jsonp.php/' + \
            'var%20KKE_auth_{}=/'.format(Helper.random_string(9)) + \
            'AuthSign_Service.getSignCode?' + \
            'query=hq_pjb&ip={}&list={}&kick=1'.format(ip, wlist)
        resp = self.session.get(url)
        pat = re.compile('result:"([^"]+)",timeout:(\d+)')
        m = pat.search(resp.text)
        if m:
            token, timeout = m.groups()
            timeout = int(timeout)
            return token
        else:
            log.error('token error: {}'.format(resp.text))
            return self.get_token(symbols, wlist)
filmweb.py 文件源码 项目:pyfilmweb 作者: lopezloo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def login(self, name, password):
      """Authenticates user.

      :param str name: account name
      :param str password: account password

      .. note::
         Throws :class:`exceptions.RequestFailed` (20, 'badCreadentials') if bad credentials are passed.
      """
      self.session = requests.session()

      remember = True
      data = self._request('login', [name, password, int(remember)], hmethod='POST')
      return LoggedUser(fw=self, uid=data[3], name=data[0], img=common.img_path_to_relative(data[1]), sex=data[4], birth_date=data[5])

   # This method is one big TODO.
ins.py 文件源码 项目:ins-account 作者: E0han 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def first_get(self):
            global _session
            _session=requests.session()
            main_url='https://www.instagram.com'
            _session.get(main_url,proxies=self.use_proxy,verify=True)
            self.save_cookies()
            if os.path.exists('cookiefile'):#print('have cookies')
                self.csrf=self.read_cookies()
                self.data=self.create_ajax()
                print(self.data)
                self.ins()
                time.sleep(5)#wait for 5 seconds
                self.my_selfie=get_pic.get_pic()#???——??-??
                self.my_selfie.get_selfie()#download random selfie picture to local folder
                self.upload()#upload the selfie
            else:
                pass
imagetypersapi.py 文件源码 项目:Imagetyperz-Python-API-examples 作者: zphanjakidze 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, username, password, timeout = 120, ref_id = 0):
        self._username = username
        self._password = password

        self._ref_id = '{}'.format(ref_id)      # save as str
        self._timeout = timeout
        self._session = session()       # init a new session

        self._normal_captcha = None            # save last solved captcha
        self._recaptcha = None

        self._error = None              # keep track of last error

        self._headers = {               # use this user agent
            'User-Agent' : USER_AGENT
        }

    # solve normal captcha
session.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_log_schema(self):
        """
        Get the log schema for this SMC version.

        :return: dict
        """
        if self.session and self.session_id:
            schema = '{}/{}/monitoring/log/schemas'.format(self.url, self.api_version)

            response = self.session.get(
                url=schema,
                headers={'cookie': self.session_id,
                         'content-type': 'application/json'})

            if response.status_code in (200, 201):
                return response.json()
session.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def available_api_versions(base_url, timeout=10, verify=True):
    """
    Get all available API versions for this SMC

    :return version numbers
    :rtype: list
    """
    try:
        r = requests.get('%s/api' % base_url, timeout=timeout,
                         verify=verify)  # no session required

        if r.status_code == 200:
            j = json.loads(r.text)
            versions = []
            for version in j['version']:
                versions.append(version['rel'])
            #versions = [float(i) for i in versions]
            return versions

        raise SMCConnectionError(
            'Invalid status received while getting entry points from SMC. '
            'Status code received %s. Reason: %s' % (r.status_code, r.reason))

    except requests.exceptions.RequestException as e:
        raise SMCConnectionError(e)


问题


面经


文章

微信
公众号

扫码关注公众号