python类Session()的实例源码

cookies.py 文件源码 项目:crepriceSpider 作者: zhousenbiao 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_cookie(account, password):
    s = requests.Session()
    payload = {
        'login_uid1': account,
        'login_pwd1': password,
        'agreeRule': "1",
        'loginsubmit': "??",
        # 'redirect_to': "http://www.creprice.cn",
        # 'testcookie': "1"
    }
    response = s.post(login_url, data=payload, allow_redirects=False)
    cookies = response.cookies.get_dict()
    logger.warning("get cookie success!!!(account is:%s)" % account)
    return json.dumps(cookies)

# ?Cookies??Redis???
numerapi.py 文件源码 项目:numerai 作者: gansanay 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
qqbot.py 文件源码 项目:QQBot 作者: springhack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def prepareLogin(self):
        self.clientid = 53999199
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:27.0) Gecko/20100101 Firefox/27.0',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
        })    
        self.urlGet(
            'https://ui.ptlogin2.qq.com/cgi-bin/login?daid=164&target=self&style=16&mibao_css=m_webqq&' + \
            'appid=501004106&enable_qlogin=0&no_verifyimg=1&s_url=http%3A%2F%2Fw.qq.com%2Fproxy.html&' + \
            'f_url=loginerroralert&strong_login=1&login_state=10&t=20131024001'
        )
        self.session.cookies.update(dict(
            RK='OfeLBai4FB', ptcz='ad3bf14f9da2738e09e498bfeb93dd9da7540dea2b7a71acfb97ed4d3da4e277',
            pgv_pvi='911366144', pgv_info='ssid pgv_pvid=1051433466',
            qrsig='hJ9GvNx*oIvLjP5I5dQ19KPa3zwxNI62eALLO*g2JLbKPYsZIRsnbJIxNe74NzQQ'
        ))
        self.getAuthStatus()
        self.session.cookies.pop('qrsig')
howlongtobeat.py 文件源码 项目:sopel-modules 作者: phixion 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def hltb(bot,trigger):
    if not trigger.group(2):
        return bot.say("Enter a game name to search.")
    game = trigger.group(2)
    url = "http://howlongtobeat.com/search_main.php?page=1"
    payload = {"queryString":game,"t":"games","sorthead":"popular","sortd":"Normal Order","length_type":"main","detail":"0"}
    test = {'Content-type':'application/x-www-form-urlencoded', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.97 Safari/537.36','origin':'https://howlongtobeat.com','referer':'https://howlongtobeat.com'}
    session = requests.Session()
    session.post(url, headers=test, data=payload)
    r = session.post(url, headers=test, data=payload)
    if len(r.content) < 250:
        return bot.say("No results.")
    bs = BeautifulSoup(r.content)
    first = bs.findAll("div", {"class":"search_list_details"})[0]
    name = first.a.text
    time = first.findAll('div')[3].text
    bot.say('{} - {}'.format(name, time))
ccu.py 文件源码 项目:ccu_and_eccu_publish 作者: gaofubin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def AkamaiEdgeGridConfig_Setup(config_file, section):
    config_file = os.path.expanduser(config_file)   
    if debug: print "DEBUG: config_file", config_file

    #Currently unused.
    required_options = ['client_token','client_secret','host','access_token']
    EdgeGridConfig = {}

    if os.path.isfile(config_file):
        config = ConfigParser.ConfigParser()
        config.readfp(open(config_file))
        for key, value in config.items(section):
            # ConfigParser lowercases magically
            EdgeGridConfig[key] = value
    else:
        print "Missing configuration file.  Run python gen_creds.py to get your credentials file set up once you've provisioned credentials in LUNA."
        exit()

    EdgeGridConfig['host'] = '%s://%s' % ('https', EdgeGridConfig['host'])

    if debug: print EdgeGridConfig
    return EdgeGridConfig

#Setup a EdgeGrid Session using the EdgeGridConfig previously loaded.
client.py 文件源码 项目:python-cachetclient 作者: dmsimard 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        """
        Initialize the class, get the necessary parameters
        """
        self.user_agent = 'python-cachetclient'
        try:
            self.endpoint = kwargs['endpoint']
        except KeyError:
            raise KeyError('Cachet API endpoint is required')

        self.api_token = kwargs.get('api_token', None)
        self.timeout = kwargs.get('timeout', None)
        self.verify = kwargs.get('verify', None)
        self.pagination = kwargs.get('pagination', False)

        self.http = requests.Session()
googleImageDownload.py 文件源码 项目:imageDownloader 作者: whcacademy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setupSession():
    session = requests.Session()
    session.header = { 'User-Agent': "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:34.0) Gecko/20100101 Firefox/34.0","Accept-Encoding": "gzip, deflate, sdch"}
    return session
api.py 文件源码 项目:pyTBA 作者: Thing342 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def __init__(self, name: str = None, description: str = None, version: str = None):
        self.app_id = {'X-TBA-App-Id': ""}
        self.session = requests.Session()
        self.session = CacheControl(self.session, heuristic=LastModified())
        self.session.headers.update(self.app_id)
        if name is not None: self.set_api_key(name, description, version)
emergingthreats_analyzer.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self):
        Analyzer.__init__(self)
        self.service = self.get_param('config.service', None, 'EmergingThreats service is missing')
        self.apikey = self.get_param('config.key', None, 'EmergingThreats apikey is missing')
        self.session = requests.Session()
        self.session.headers.update({"Authorization": self.apikey})
safebrowsing.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, key, client_id, client_version='0.1'):
        self.api_key = key
        self.session = requests.Session()
        self.url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={}'.format(key)
        self.client_id = client_id
        self.client_version = client_version
download_hashes.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(path, quiet=False):
    """
    Downloads all available hash files to a given path.

    :param path: Path to download directory
    :param quiet: If set to True, no progressbar is displayed
    """
    if os.path.isdir(path):
        session = requests.Session()
        session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'}
        max_num = max(list(map(int, re.sub(r'[\<\>]',
                                           '',
                                           '\n'.join(re.findall(r'\>[1-9][0-9]{2}\<',
                                                                session.get('https://virusshare.com/hashes.4n6').text
                                                                )
                                                     )
                                           ).split('\n')
                               )
                           )
                      )
        if not quiet:
            p = progressbar.ProgressBar(max_value=max_num)
        for i in range(max_num):
            filename = str(i).zfill(3) + '.md5'
            if os.path.exists(os.path.join(path, filename)):
                continue
            if not quiet:
                p.update(i)
            url = URL + filename
            head = session.head(url)
            if head.status_code == 200:
                body = session.get(url, stream=True)
                with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile:
                    for chunk in body.iter_content(chunk_size=1024):
                        afile.write(b'' + chunk)
                body.close()
    else:
        print('Given path is not a directory.')
        sys.exit(1)
zoom.py 文件源码 项目:Zoom2Youtube 作者: Welltory 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def session(self, email, password):
        session = requests.Session()
        session.headers.update({
            'content-type': 'application/x-www-form-urlencoded'
        })
        response = session.post(
            ZoomClient.SIGNIN_URL, data={'email': email, 'password': password}
        )
        return session, response
glance_registry_local_check.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check(auth_ref, args):
    # We call get_keystone_client here as there is some logic within to get a
    # new token if previous one is bad.
    keystone = get_keystone_client(auth_ref)
    auth_token = keystone.auth_token
    registry_endpoint = 'http://{ip}:9191'.format(ip=args.ip)

    s = requests.Session()

    s.headers.update(
        {'Content-type': 'application/json',
         'x-auth-token': auth_token})

    try:
        # /images returns a list of public, non-deleted images
        r = s.get('%s/images' % registry_endpoint, verify=False, timeout=10)
        is_up = r.ok
    except (exc.ConnectionError, exc.HTTPError, exc.Timeout):
        is_up = False
    except Exception as e:
        status_err(str(e))

    metric_values = dict()

    status_ok()
    metric_bool('glance_registry_local_status', is_up)
    # only want to send other metrics if api is up
    if is_up:
        milliseconds = r.elapsed.total_seconds() * 1000
        metric('glance_registry_local_response_time', 'double',
               '%.3f' % milliseconds, 'ms')
        metric_values['glance_registry_local_response_time'] = ('%.3f' % milliseconds)
        metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
nova_api_metadata_local_check.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def check(args):
    metadata_endpoint = ('http://{ip}:8775'.format(ip=args.ip))
    is_up = True

    s = requests.Session()

    try:
        # looks like we can only get / (ec2 versions) without specifying
        # an instance ID and other headers
        versions = s.get('%s/' % metadata_endpoint,
                         verify=False,
                         timeout=10)
        milliseconds = versions.elapsed.total_seconds() * 1000
        if not versions.ok or '1.0' not in versions.content.splitlines():
            is_up = False
    except (exc.ConnectionError, exc.HTTPError, exc.Timeout) as e:
        is_up = False
    except Exception as e:
        status_err(str(e))

    metric_values = dict()

    status_ok()
    metric_bool('nova_api_metadata_local_status', is_up)
    # only want to send other metrics if api is up
    if is_up:
        metric('nova_api_metadata_local_response_time',
               'double',
               '%.3f' % milliseconds,
               'ms')

        metric_values['nova_api_metadata_local_response_time'] = ('%.3f' % milliseconds)
        metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
custom_app.py 文件源码 项目:service-fabric-cli 作者: Azure 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def upload(path, imagestore_string='fabric:ImageStore', show_progress=False):  # pylint: disable=too-many-locals,missing-docstring
    from sfctl.config import (client_endpoint, no_verify_setting, ca_cert_info,
                              cert_info)
    import requests

    abspath = validate_app_path(path)
    basename = os.path.basename(abspath)

    endpoint = client_endpoint()
    cert = cert_info()
    ca_cert = True
    if no_verify_setting():
        ca_cert = False
    elif ca_cert_info():
        ca_cert = ca_cert_info()

    if all([no_verify_setting(), ca_cert_info()]):
        raise CLIError('Cannot specify both CA cert info and no verify')


    # Upload to either to a folder, or native image store only
    if 'file:' in imagestore_string:
        dest_path = path_from_imagestore_string(imagestore_string)
        upload_to_fileshare(abspath, os.path.join(dest_path, basename),
                            show_progress)
    elif imagestore_string == 'fabric:ImageStore':
        with requests.Session() as sesh:
            sesh.verify = ca_cert
            sesh.cert = cert
            upload_to_native_imagestore(sesh, endpoint, abspath, basename,
                                        show_progress)
    else:
        raise CLIError('Unsupported image store connection string')
consul_check_postgres.py 文件源码 项目:consul-pg 作者: adamcstephens 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, service, role_source, configfile=DEFAULT_CONFIGFILE):
        self.service = service
        self.role_source = role_source

        self.api_endpoint = 'http://127.0.0.1:8500/v1'
        self.api_session = requests.Session()

        self.hostname = gethostname()
        self.short_hostname = self.hostname.split('.')[0]

        self.update_service = False
        self.valid_states = ['master', 'slave', 'fail']

        self.configfile = configfile
        self.leader_uri = self.api_endpoint + '/kv/session/' + self.service + '/leader'
consul_check_postgres.py 文件源码 项目:consul-pg 作者: adamcstephens 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def current_leader_session_id(self):
        check_current_leader = self.api_session.get(self.leader_uri)
        if check_current_leader.status_code == 200:
            return check_current_leader.json()[0].get('Session')
bodegaREQ.py 文件源码 项目:OpenCouture-Dev 作者: 9-9-0 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self):
        self.URL_vendor      = 'http://shop.bdgastore.com/'
        self.URL_product     = 'http://shop.bdgastore.com/collections/footwear/products/y-3-pureboost-zg'
        self.URL_addToCart   = 'http://shop.bdgastore.com/cart/add.js'
        self.URL_cart        = 'http://shop.bdgastore.com/cart'
        self.user_size       = '8'
        self.user_session    = requests.Session()
get_nodes.py 文件源码 项目:stalkerGKSU 作者: zense 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_data(username, no):
    if no == 0:
        z = 'followers'
    else:
        z = 'following'
    # these lines of code gets the list of followers or the following on the first page
    # when there are no further pages of followers or following. And if there are go forward with the next page
    s = requests.Session()
    final=[]
    x = 1
    pages = [""]
    data = []
    while(pages != [] and x <= max_number/no_per_page):
        r = s.get('https://github.com/' + username + '?page=' +  str(x) + '&tab=' + z) #first getting all the followers for z=0, and following for z=1
        soup = BeautifulSoup(r.text)
        data = data + soup.find_all("div", {"class" : "d-table col-12 width-full py-4 border-bottom border-gray-light"})
        pages = soup.find_all("div", {"class" : "pagination"})
        x += 1
   # getting company and area.
    for i in data:
        username = i.find_all("a")[0]['href']
        try:
            company = i.find_all("span", {"class" : "mr-3"})[0].text.strip()
        except:
            company = "xxxxx"
        try:
            area = i.find_all("p", {"class" : "text-gray text-small mb-0"})[0].text.strip()
        except:
            area = "xxxxx"
        soup2 = BeautifulSoup(str(i))
        name = soup2.find_all("span",{"class" : "f4 link-gray-dark"})[0].text
        final.append([username,company,area,name])
    return final
get_nodes.py 文件源码 项目:stalkerGKSU 作者: zense 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def scrape_org(org,main_list,organisation):
     s = requests.Session()
     r = s.get('https://github.com/orgs/'+org+'/people')
     soup = BeautifulSoup(r.text)
     data = soup.find_all("li", {"class" : "table-list-item member-list-item js-bulk-actions-item "})

     for i in data:
         soup2=BeautifulSoup(str(i))
         data2=soup2.find_all("div",{"class" : "table-list-cell py-3 pl-3 v-align-middle member-avatar-cell css-truncate pr-0"})
         username = data2[0].find_all("a")[0]['href']
         data3 = soup2.find_all("div",{"class" : "table-list-cell py-3 v-align-middle member-info css-truncate pl-3"})
         name = data3[0].find_all("a")[0].text.strip()
         main_list.append([username,name])
get_nodes.py 文件源码 项目:stalkerGKSU 作者: zense 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update_org_list(main_list,organisation):
    s = requests.Session()
    for i in main_list:
         r = s.get('https://github.com/'+i[0])
         soup = BeautifulSoup(r.text)
         data = soup.find_all("li",{"aria-label":"Organization"})
         try:
             if data[0].text not in organisation:
                 organisation.append(data[0].text)
         except:
             continue
    return organisation
get_nodes.py 文件源码 项目:stalkerGKSU 作者: zense 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def scrape_org_general(org,main_list,organisation):
    org.replace(" ","+")
    s = requests.Session()
    count = 1
    k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
    r = s.get(k)

    soup = BeautifulSoup(r.text,"lxml")
    data = soup.find_all("div",{"class":"user-list-info ml-2"})
    while data!=[]:
        for i in data:
            username = i.find_all("a")[0]['href']
            name = i.find_all("span",{"class":"f4 ml-1"})[0].text.strip()
            main_list.append([username,name])
        count+=1
        k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
        r = s.get(k)
        soup = BeautifulSoup(r.text,"lxml")
        data = soup.find_all("div",{"class":"user-list-info ml-2"})

# scraping the github pages
test.py 文件源码 项目:potatoygg 作者: Ripolin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self, conf='/test.cfg'):
        settings = Settings()
        settings.setFile(base_path + conf)
        Env.set('settings', settings)
        Env.set('http_opener', requests.Session())
        Env.set('cache', NoCache())
        YGG.log.logger.setLevel('DEBUG')
        YGG.log.logger.addHandler(handler)
        return YGG()
jav-download.py 文件源码 项目:javID-SearchTool 作者: VegetableCat 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_av_magnet(avcode):




    Referer={
    "Referer": "123"
    }

    s = requests.Session()
    gid,uc = download_image(avcode)

    params={
    'gid':gid,
    'uc':uc,
    'lang':'zh'
    }

    r2 = s.get("http://www.javbus.com/ajax/uncledatoolsbyajax.php",params=params, proxies=proxy, headers=Referer)
    soup = BeautifulSoup(r2.content.decode('utf-8', 'ignore'),'html.parser')

    trs  = soup.findAll('tr',attrs={"height":"35px"})
    print '[*] get magnet link'
    for tr in trs:
        trsoup = BeautifulSoup(str(tr).decode('utf-8', 'ignore'),'html.parser')
        td2 = trsoup.findAll('td',attrs={"style":"text-align:center;white-space:nowrap"})
        a = td2[0].find('a')
        magnet = a.get("href") #unicode object
        size = a.text.strip()
        print '[*] '+magnet,size

    os.chdir("../..")
http.py 文件源码 项目:transfert 作者: rbernand 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _connect(self):
        self._session = requests.Session()
        adaptator = requests.adapters.HTTPAdapter()
        adaptator.max_retries = HttpRetry(
            read=self.READ_MAX_RETRIES,
            connect=self.CONN_MAX_RETRIES,
            backoff_factor=self.BACKOFF_FACTOR)
        self._session.mount(str(self.url), adaptator)
        self.__conn = self._session.get(
            self.url,
            stream=True,
            timeout=(self.CONN_TIMEOUT, self.READ_TIMEOUT))
AttackSession.py 文件源码 项目:xxetimes 作者: ropnop 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def makeRequestSession(self):
        host = self.requestHandler.headers.getheader('host',None)
        path = self.requestHandler.path
        self.url = self.uri + "://" + host + path

        session = requests.Session()

        for header in self.requestHandler.headers.keys():
            if header != 'content-length':
                session.headers.update({header : self.requestHandler.headers.getheader(header)})

        if self.proxies:
            session.proxies = self.proxies
        return session
test_tls.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        self.tls_adapter = CbAPISessionAdapter(force_tls_1_2=True)
        self.session = requests.Session()
        self.session.mount("https://", self.tls_adapter)
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _url_to_key(self, url):
        session = requests.Session()
        return self.create_key(session.prepare_request(requests.Request('GET', url)))
core.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def uninstall_cache():
    """ Restores ``requests.Session`` and disables cache
    """
    _patch_session_factory(OriginalSession)
cbapi.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, server, ssl_verify=True, token=None, ignore_system_proxy=False,
                 use_https_proxy=None, ssl_verify_hostname=True, use_http_proxy=None):
        """ Requires:
                server -    URL to the Carbon Black server.  Usually the same as
                            the web GUI.
                ssl_verify - verify server SSL certificate
                token - this is for CLI API interface
        """

        # We will uncomment this once cbapi 1.0.0 is released
        # warn("CbApi is deprecated and will be removed as of cbapi 2.0.0.", DeprecationWarning)

        if not server.startswith("http"):
            raise TypeError("Server must be URL: e.g, http://cb.example.com")

        if token is None:
            raise TypeError("Missing required authentication token.")

        self.server = server.rstrip("/")
        self.ssl_verify = ssl_verify
        self.token = token
        self.token_header = {'X-Auth-Token': self.token}
        self.session = requests.Session()

        if not ssl_verify_hostname:
            self.session.mount("https://", HostNameIgnoringAdapter())

        self.proxies = {}
        if ignore_system_proxy:         # see https://github.com/kennethreitz/requests/issues/879
            self.proxies = {
                'no': 'pass'
            }
        else:
            if use_http_proxy:
                self.proxies['http'] = use_http_proxy
            if use_https_proxy:
                self.proxies['https'] = use_https_proxy


问题


面经


文章

微信
公众号

扫码关注公众号