python类HTTPAdapter()的实例源码

requests_client.py 文件源码 项目:apimatic-cli 作者: apimatic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
        """The constructor.

        Args:
            timeout (float): The default global timeout(seconds).

        """
        self.timeout = timeout
        self.session = requests.session()

        if max_retries and retry_interval:
            retries = Retry(total=max_retries, backoff_factor=retry_interval)
            self.session.mount('http://', HTTPAdapter(max_retries=retries))
            self.session.mount('https://', HTTPAdapter(max_retries=retries))

        if cache:
            self.session = CacheControl(self.session)
utils.py 文件源码 项目:resultsdb-updater 作者: release-engineering 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def retry_session():
    # This will give the total wait time in minutes:
    # >>> sum([min((0.3 * (2 ** (i - 1))), 120) / 60 for i in range(24)])
    # >>> 30.5575
    # This works by the using the minimum time in seconds of the backoff time
    # and the max back off time which defaults to 120 seconds. The backoff time
    # increases after every failed attempt.
    session = requests.Session()
    retry = Retry(
        total=24,
        read=5,
        connect=24,
        backoff_factor=0.3,
        status_forcelist=(500, 502, 504),
        method_whitelist=('GET', 'POST'),
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
eclair.py 文件源码 项目:lightning-integration 作者: cdecker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requests_retry_session(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
appengine.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def monkeypatch(validate_certificate=True):
    """Sets up all Sessions to use AppEngineAdapter by default.

    If you don't want to deal with configuring your own Sessions,
    or if you use libraries that use requests directly (ie requests.post),
    then you may prefer to monkeypatch and auto-configure all Sessions.

    .. warning: :

        If ``validate_certificate`` is ``False``, certification validation will
        effectively be disabled for all requests.
    """
    _check_version()
    # HACK: We should consider modifying urllib3 to support this cleanly,
    # so that we can set a module-level variable in the sessions module,
    # instead of overriding an imported HTTPAdapter as is done here.
    adapter = AppEngineAdapter
    if not validate_certificate:
        adapter = InsecureAppEngineAdapter

    sessions.HTTPAdapter = adapter
    adapters.HTTPAdapter = adapter
appengine.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def monkeypatch(validate_certificate=True):
    """Sets up all Sessions to use AppEngineAdapter by default.

    If you don't want to deal with configuring your own Sessions,
    or if you use libraries that use requests directly (ie requests.post),
    then you may prefer to monkeypatch and auto-configure all Sessions.

    .. warning: :

        If ``validate_certificate`` is ``False``, certification validation will
        effectively be disabled for all requests.
    """
    _check_version()
    # HACK: We should consider modifying urllib3 to support this cleanly,
    # so that we can set a module-level variable in the sessions module,
    # instead of overriding an imported HTTPAdapter as is done here.
    adapter = AppEngineAdapter
    if not validate_certificate:
        adapter = InsecureAppEngineAdapter

    sessions.HTTPAdapter = adapter
    adapters.HTTPAdapter = adapter
common.py 文件源码 项目:clusterfuzz-tools 作者: google 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_http():
  """Get the http object."""
  ensure_dir(CLUSTERFUZZ_TESTCASES_DIR)
  http = requests_cache.CachedSession(
      cache_name=os.path.join(CLUSTERFUZZ_TESTCASES_DIR, 'http_cache'),
      backend='sqlite',
      allowable_methods=('GET', 'POST'),
      allowable_codes=[200],
      expire_after=HTTP_CACHE_TTL)
  http.mount(
      'https://',
      adapters.HTTPAdapter(
          # backoff_factor is 0.5. Therefore, the max wait time is 16s.
          retry.Retry(
              total=5, backoff_factor=0.5,
              status_forcelist=[500, 502, 503, 504]))
  )
  return http
sessions.py 文件源码 项目:Cayenne-Agent 作者: myDevicesIoT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, executor=None, max_workers=2, session=None, *args,
                 **kwargs):
        """Creates a FuturesSession

        Notes
        ~~~~~

        * ProcessPoolExecutor is not supported b/c Response objects are
          not picklable.

        * If you provide both `executor` and `max_workers`, the latter is
          ignored and provided executor is used as is.
        """
        super(FuturesSession, self).__init__(*args, **kwargs)
        if executor is None:
            executor = ThreadPoolExecutor(max_workers=max_workers)
            # set connection pool size equal to max_workers if needed
            if max_workers > DEFAULT_POOLSIZE:
                adapter_kwargs = dict(pool_connections=max_workers,
                                      pool_maxsize=max_workers)
                self.mount('https://', HTTPAdapter(**adapter_kwargs))
                self.mount('http://', HTTPAdapter(**adapter_kwargs))

        self.executor = executor
        self.session = session
nubank.py 文件源码 项目:bankscraper 作者: kamushadenes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, document, password, omit_sensitive_data=False, quiet=False):
        if not quiet:
            print('[*] Nubank Parser is starting...')


        self.account = Account(document, None, password, account_type='card')

        self.omit_sensitive_data = omit_sensitive_data
        self.quiet = quiet
        self.account.currency = 'R$'
        self.account.bank = 'Nubank'


        self.session = requests.Session()
        self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32,pool_connections=50, pool_maxsize=50))
        self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36'})
        self.session.headers.update({'Content-Type': 'application/json'})
        self.session.headers.update({'Referer': 'https://conta.nubank.com.br/'})
ticket.py 文件源码 项目:bankscraper 作者: kamushadenes 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, card, omit_sensitive_data=False, quiet=False, dbc_username=None, dbc_password=None, validator=TicketValidator):
        if not quiet:
            print('[*] Ticket Parser is starting...')

        self.validator = validator()

        self.account = Account(card=card, account_type='card')

        self.validate()

        self.account.currency = 'R$'

        self.omit_sensitive_data = omit_sensitive_data
        self.quiet = quiet
        self.account.currency = 'R$'
        self.account.bank = 'Ticket'

        self.captcha = ''
        self.token = ''

        self.session = requests.Session()
        self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32, pool_connections=50, pool_maxsize=50))
        self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36'})
        self.session.headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
        self.session.headers.update({'Referer': 'http://www.ticket.com.br/portal/consulta-de-saldo/'})
sodexo.py 文件源码 项目:bankscraper 作者: kamushadenes 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, card, document, omit_sensitive_data=False, quiet=False, validator=SodexoValidator):
        if not quiet:
            print('[*] Sodexo Parser is starting...')

        self.validator = validator()

        self.account = Account(document=document, card=card, account_type='card')

        self.validate()

        self.omit_sensitive_data = omit_sensitive_data
        self.quiet = quiet
        self.account.currency = 'R$'
        self.account.bank = 'Sodexo'

        self.session = requests.Session()
        self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32, pool_connections=50, pool_maxsize=50))
        self.session.headers.update({'User-Agent': 'Apache-HttpClient/android/Nexus 5'})
        self.session.headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
api.py 文件源码 项目:crysadm 作者: HuiMi24 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ubus_cd(session_id, account_id, action, out_params, url_param=None):
    url = "http://kjapi.peiluyou.com:5171/ubus_cd?account_id=%s&session_id=%s&action=%s" % (
        account_id, session_id, action)
    if url_param is not None:
        url += url_param

    params = ["%s" % session_id] + out_params

    data = {"jsonrpc": "2.0", "id": 1, "method": "call", "params": params}
    try:
        body = dict(data=json.dumps(data), action='onResponse%d' %
                    int(time.time() * 1000))
        s = requests.Session()
        s.mount('http://', HTTPAdapter(max_retries=5))
        proxies = api_proxies()
        r = s.post(url, data=body, proxies=proxies)
        result = r.text[r.text.index('{'):r.text.rindex('}') + 1]
        return json.loads(result)

    except requests.exceptions.RequestException as e:
        return __handle_exception(e=e)

# ??????
connection.py 文件源码 项目:python-percy-client 作者: percy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _requests_retry_session(
        self,
        retries=3,
        backoff_factor=0.3,
        method_whitelist=['HEAD', 'GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'TRACE'],
        status_forcelist=(500, 502, 503, 504, 520, 524),
        session=None,
    ):
        session = session or requests.Session()
        retry = Retry(
            total=retries,
            read=retries,
            connect=retries,
            status=retries,
            method_whitelist=method_whitelist,
            backoff_factor=backoff_factor,
            status_forcelist=status_forcelist,
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        return session
api.py 文件源码 项目:crysadm 作者: seatom 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ubus_cd(session_id, account_id, action, out_params, url_param=None):
    url = "http://kjapi.peiluyou.com:5171/ubus_cd?account_id=%s&session_id=%s&action=%s" % (account_id, session_id, action)
    if url_param is not None:
        url += url_param

    params = ["%s" % session_id] + out_params

    data = {"jsonrpc": "2.0", "id": 1, "method": "call", "params": params}
    try:
        body = dict(data=json.dumps(data), action='onResponse%d' % int(time.time() * 1000))
        s = requests.Session()
        s.mount('http://', HTTPAdapter(max_retries=5))
        proxies = api_proxies()
        r = s.post(url, data=body, proxies=proxies)
        result = r.text[r.text.index('{'):r.text.rindex('}')+1]
        return json.loads(result)

    except requests.exceptions.RequestException as e:
        return __handle_exception(e=e)

# ??????
scrape_page.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_soup(url, num_retries = 10):
    """
    Takes in a url and returns the parsed BeautifulSoup code for that url with
    handling capabilities if the request 'bounces'.
    """

    s = requests.Session()

    retries = Retry(
        total = num_retries,
        backoff_factor = 0.1,
        status_forcelist = [500, 502, 503, 504]
        )

    s.mount('http://', HTTPAdapter(max_retries = retries))

    return BeautifulSoup(s.get(url).text, 'html.parser')
requests_downloader.py 文件源码 项目:Sasila 作者: DarkSand 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, loginer=None, use_proxy=False):
        self.loginer = loginer
        self.use_proxy = use_proxy
        if use_proxy:
            self.proxy_pool = ProxyPool()
            if len(self.proxy_pool) == 0:
                self.use_proxy = False
        self._cookies = None

        self._headers = dict()
        self._headers[
            "User-Agent"] = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36"
        self._headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
        self._headers["Accept-Encoding"] = "gzip, deflate, sdch"
        self._headers["Accept-Language"] = "zh-CN,zh;q=0.8"
        self._request_retry = HTTPAdapter(max_retries=3)

        cookie_dict = dict()
        self._cookies = cookie_dict
asyncadapter.py 文件源码 项目:plex-for-kodi-mod 作者: mrclemds 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
        """Initializes a urllib3 PoolManager. This method should not be called
        from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param connections: The number of urllib3 connection pools to cache.
        :param maxsize: The maximum number of connections to save in the pool.
        :param block: Block when no free connections are available.
        """
        # save these values for pickling
        self._pool_connections = connections
        self._pool_maxsize = maxsize
        self._pool_block = block

        self.poolmanager = AsyncPoolManager(num_pools=connections, maxsize=maxsize, block=block)
        self.connections = []
asyncadapter.py 文件源码 项目:plex-for-kodi-mod 作者: mrclemds 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_connection(self, url, proxies=None):
        """Returns a urllib3 connection for the given URL. This should not be
        called from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param url: The URL to connect to.
        :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
        """
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            proxy_headers = self.proxy_headers(proxy)

            if proxy not in self.proxy_manager:
                self.proxy_manager[proxy] = proxy_from_url(
                    proxy,
                    proxy_headers=proxy_headers,
                    num_pools=self._pool_connections,
                    maxsize=self._pool_maxsize,
                    block=self._pool_block
                )

            conn = self.proxy_manager[proxy].connection_from_url(url)
        else:
            # Only scheme should be lower case
            parsed = urlparse(url)
            url = parsed.geturl()
            conn = self.poolmanager.connection_from_url(url)

        self.connections.append(conn)
        return conn
cli_login_demo.py 文件源码 项目:open-mic 作者: cosmir 项目源码 文件源码 阅读 68 收藏 0 点赞 0 评论 0
def demo(base_url):
    """Login through a third-party OAuth handler and print some stats.

    Parameters
    ----------
    base_url : str
        Base URL of the CMS server.
    """
    session = requests.Session()
    adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.02))
    session.mount('{}://'.format(urlparse(base_url).scheme), adapter)

    wb = webbrowser.get()
    login_url = os.path.join(base_url, "login?complete=no")
    session.get(login_url)
    wb.open(login_url)

    auth_url = input("Enter the URL returned after authentication:")
    response = session.get(auth_url.replace("complete=no", 'complete=yes'))
    assert response.status_code == 200

    print(session.get(os.path.join(base_url, 'me')).content)
cloud.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_requests_session(self):
        requests_session = self._session.GetParameter("requests_session")
        if requests_session == None:
            # To make sure we can use the requests session in the threadpool we
            # need to make sure that the connection pool can block. Otherwise it
            # will raise when it runs out of connections and the threads will be
            # terminated.
            requests_session = requests.Session()
            requests_session.mount("https://", adapters.HTTPAdapter(
                pool_connections=10, pool_maxsize=300, max_retries=10,
                pool_block=True))

            requests_session.mount("http://", adapters.HTTPAdapter(
                pool_connections=10, pool_maxsize=300, max_retries=10,
                pool_block=True))

            self._session.SetCache("requests_session", requests_session)

        return requests_session
http.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_requests_session(self):
        requests_session = self._session.GetParameter("requests_session")
        if requests_session == None:
            # To make sure we can use the requests session in the threadpool we
            # need to make sure that the connection pool can block. Otherwise it
            # will raise when it runs out of connections and the threads will be
            # terminated.
            requests_session = requests.Session()
            requests_session.mount("https://", adapters.HTTPAdapter(
                pool_connections=10, pool_maxsize=300, max_retries=10,
                pool_block=True))

            requests_session.mount("http://", adapters.HTTPAdapter(
                pool_connections=10, pool_maxsize=300, max_retries=10,
                pool_block=True))

            self._session.SetCache("requests_session", requests_session)

        return requests_session
api.py 文件源码 项目:crysadm 作者: sanzuwu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ubus_cd(session_id, account_id, action, out_params, url_param=None):
    url = "http://kjapi.peiluyou.com:5171/ubus_cd?account_id=%s&session_id=%s&action=%s" % (account_id, session_id, action)
    if url_param is not None:
        url += url_param

    params = ["%s" % session_id] + out_params

    data = {"jsonrpc": "2.0", "id": 1, "method": "call", "params": params}
    try:
        body = dict(data=json.dumps(data), action='onResponse%d' % int(time.time() * 1000))
        s = requests.Session()
        s.mount('http://', HTTPAdapter(max_retries=5))
        proxies = api_proxies()
        r = s.post(url, data=body, proxies=proxies)
        result = r.text[r.text.index('{'):r.text.rindex('}')+1]
        return json.loads(result)

    except requests.exceptions.RequestException as e:
        return __handle_exception(e=e)

# ??????
uidextract.py 文件源码 项目:openmailbox_downloader 作者: appleorange1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def list_folders(csrftoken, sessionid):
    print("Getting list of folders")
    # Create a session object from requests library
    s = requests.Session()
    retries = Retry(total=10, backoff_factor=1,
                    status_forcelist=[500, 502, 504])
    s.mount('https://', HTTPAdapter(max_retries=retries))
    s.headers.update({'Cookie': 'csrftoken={0};'
                      'sessionid={1}'.format(csrftoken, sessionid)})
    mdatareq = 'https://app.openmailbox.org/requests/webmail?action=folderlist'
    print(mdatareq)

    metadata = json.loads(s.get(mdatareq).text)
    print(metadata)

    print('\nFolder names:')
    for line in metadata['folders']:
        print(line['name'])
pact.py 文件源码 项目:pact-python 作者: pact-foundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _wait_for_server_start(self):
        """
        Wait for the mock service to be ready for requests.

        :rtype: None
        :raises RuntimeError: If there is a problem starting the mock service.
        """
        s = requests.Session()
        retries = Retry(total=15, backoff_factor=0.1)
        s.mount('http://', HTTPAdapter(max_retries=retries))
        resp = s.get(self.uri, headers=self.HEADERS)
        if resp.status_code != 200:
            self._process.terminate()
            self._process.communicate()
            raise RuntimeError(
                'There was a problem starting the mock service: %s', resp.text)
helper.py 文件源码 项目:ws-backend-community 作者: lavalamp- 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
        """
        Initializes a urllib3 PoolManager.

        This method should not be called from user code, and is only
        exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param connections: The number of urllib3 connection pools to cache.
        :param maxsize: The maximum number of connections to save in the pool.
        :param block: Block when no free connections are available.
        :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.
        """
        self._pool_connections = connections
        self._pool_maxsize = maxsize
        self._pool_block = block
        self.poolmanager = PoolManager(
            num_pools=connections,
            maxsize=maxsize,
            block=block,
            strict=True,
            ssl_version=self.SSL_VERSION,
            **pool_kwargs
        )
app.py 文件源码 项目:github-bugzilla-pr-linker 作者: mozilla 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def requests_retry_session(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
shared.py 文件源码 项目:Hockey-Scraper 作者: HarryShomer 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_url(url):
    """
    Get the url

    :param url: given url

    :return: page
    """
    response = requests.Session()
    retries = Retry(total=10, backoff_factor=.1)
    response.mount('http://', HTTPAdapter(max_retries=retries))

    try:
        response = response.get(url, timeout=5)
        response.raise_for_status()
    except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError):
        return None

    return response
dirbruter.py 文件源码 项目:faze 作者: KhasMek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dir_bruter(self, target_url, word_queue, user_agent):
        results = {}
        session = requests.Session()
        session.mount(target_url.split(':', 1)[0], HTTPAdapter(max_retries=3))
        while not word_queue.empty():
            # attempt = word_queue.get()
            attempt_list = [word_queue.get()]
            for brute in attempt_list:
                headers = {"User-Agent": user_agent}
                request = session.get(target_url + brute, headers=headers, verify=False)
                if request.status_code == 200:
                    print("{i}     [{r}] => {u}".format(i=ctinfo, r=request.status_code, u=request.url))
                    logging.info("{i}     [{r}] => {u}".format(i=ctinfo, r=request.status_code, u=request.url))
                    results[request.url] = request.status_code
                elif request.status_code != 404:
                    # TODO: add a setting `only_save_200` or something like that, if no, save these results.
                    logging.error("{e}     {c} => {u}".format(e=cterr, c=request.status_code, u=request.url))
                    pass
        return results
tvmaze.py 文件源码 项目:tvmaze-plex-agent 作者: srob650 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _endpoint_premium_get(self, url):
        s = requests.Session()
        retries = Retry(total=5,
                        backoff_factor=0.1,
                        status_forcelist=[429])
        s.mount('http://', HTTPAdapter(max_retries=retries))
        try:
            r = s.get(url, auth=(self.username, self.api_key))
        except requests.exceptions.ConnectionError as e:
            raise ConnectionError(repr(e))

        s.close()

        if r.status_code in [404, 422]:
            return None

        if r.status_code == 400:
            raise BadRequest('Bad Request for url {}'.format(url))

        results = r.json()
        if results:
            return results
        else:
            return None
tvmaze.py 文件源码 项目:tvmaze-plex-agent 作者: srob650 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _endpoint_premium_delete(self, url):
        s = requests.Session()
        retries = Retry(total=5,
                        backoff_factor=0.1,
                        status_forcelist=[429])
        s.mount('http://', HTTPAdapter(max_retries=retries))
        try:
            r = s.delete(url, auth=(self.username, self.api_key))
        except requests.exceptions.ConnectionError as e:
            raise ConnectionError(repr(e))

        s.close()

        if r.status_code == 400:
            raise BadRequest('Bad Request for url {}'.format(url))

        if r.status_code == 200:
            return True

        if r.status_code == 404:
            return None
tvmaze.py 文件源码 项目:tvmaze-plex-agent 作者: srob650 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _endpoint_premium_put(self, url, payload=None):
        s = requests.Session()
        retries = Retry(total=5,
                        backoff_factor=0.1,
                        status_forcelist=[429])
        s.mount('http://', HTTPAdapter(max_retries=retries))
        try:
            r = s.put(url, data=payload, auth=(self.username, self.api_key))
        except requests.exceptions.ConnectionError as e:
            raise ConnectionError(repr(e))

        s.close()

        if r.status_code == 400:
            raise BadRequest('Bad Request for url {}'.format(url))

        if r.status_code == 200:
            return True

        if r.status_code in [404, 422]:
            return None

    # Get Show object


问题


面经


文章

微信
公众号

扫码关注公众号