python类request()的实例源码

dss_op.py 文件源码 项目:jcsclient 作者: jiocloudservices 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.http_method = None
        # mandatory headers for each request
        self.http_headers = {
                              'Authorization': None,
                              'Date': None,
                            }
        self.dss_url = config.get_service_url('dss')
        if(self.dss_url.endswith('/')):
            self.dss_url = self.dss_url[:-1]
        self.access_key = config.get_access_key()
        self.secret_key = config.get_secret_key()
        self.is_secure_request = config.check_secure()
        self.dss_op_path = None
        self.dss_query_str = None
        self.dss_query_str_for_signature = None
rundeck.py 文件源码 项目:pyrundeck 作者: pschmitt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __request(self, method, url, params=None):
        logger.info('{} {} Params: {}'.format(method, url, params))
        cookies = {'JSESSIONID': self.auth_cookie}
        h = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'X-Rundeck-Auth-Token': self.token
        }
        r = requests.request(
            method, url, cookies=cookies, headers=h, json=params,
            verify=self.verify
        )
        logger.debug(r.content)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError as e:
            logger.error(e.message)
            return r.content
__init__.py 文件源码 项目:buffer-api 作者: barisariburnu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_request(self, method, path, options):
        """Creating a request with the given arguments

        If api_version is set, appends it immediately after host
        """
        version = '/' + options['api_version'] if 'api_version' in options else ''

        # Adds a suffix (ex: ".html", ".json") to url
        suffix = options['response_type'] if 'response_type' in options else 'json'
        path = path + '.' + suffix

        path = urlparse.urljoin(self.base, version + path)

        if 'api_version' in options:
            del options['api_version']

        if 'response_type' in options:
            del options['response_type']

        return requests.request(method, path, **options)
fetchers.py 文件源码 项目:memes-reposter 作者: vaniakosmos 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def fetch(self):
        section = 'hot'  # hot | top | user
        sort = 'viral'  # viral | top | time | rising (only available with user section)
        show_viral = 'true'
        show_mature = 'true'
        album_previews = 'false'

        url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
        querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
        headers = {'authorization': f'Client-ID {IMGUR_CLIENT_ID}'}
        response = requests.request("GET", url, headers=headers, params=querystring)
        json = response.json()

        self.logger.debug(f'Fetched. Code: {response.status_code}')

        return json['data'][:FETCH_LIMIT]
python_wpapi.py 文件源码 项目:python_wpapi 作者: Lobosque 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _request(self, endpoint, method='GET', files=None, headers={}, **kwargs):
        params = {}
        if method == 'GET':
            params = kwargs
            data = None
            headers = {'Content-Length': '0'}
        else:
            data = kwargs

        response = requests.request(method,
            endpoint,
            auth=self.auth,
            params=params,
            json=data,
            headers=headers,
            files=files
        )
        if not response.status_code // 100 == 2:
            error = WpApiError.factory(response)
            raise error
        return response.json()
transports.py 文件源码 项目:healthchecks_asgards 作者: andela 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def request(self, method, url, **kwargs):
        try:
            options = dict(kwargs)
            if "headers" not in options:
                options["headers"] = {}

            options["timeout"] = 5
            options["headers"]["User-Agent"] = "healthchecks.io"

            r = requests.request(method, url, **options)
            if r.status_code not in (200, 201, 204):
                return "Received status code %d" % r.status_code
        except requests.exceptions.Timeout:
            # Well, we tried
            return "Connection timed out"
        except requests.exceptions.ConnectionError:
            return "Connection failed"
default.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def find_finger():
    ps = request.vars.ps
    import serial
    ser = serial.Serial('/dev/ttyACM0', 9600)
    ser.write('5')
    # ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X
    while True:
        line = ser.readline()
        print line
        if "FINGERFOUND" in line:
            id = line.split(",")[1]
            ser.close()
            r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id)
            obj = json.loads(r.text)
            if ps == obj['password']:
                return r.text
            else:
                return 'error'
digitalocean.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'Authorization': 'Bearer {0}'.format(self.options.get('auth_token'))
        }
        if not url.startswith(self.api_endpoint):
            url = self.api_endpoint + url

        r = requests.request(action, url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            return r.json()
vultr.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        default_headers = {
            'Accept': 'application/json',
            # 'Content-Type': 'application/json',
            'API-Key': self.options['auth_token']
        }

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=data,
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.

        if action == 'DELETE' or action == 'PUT' or action == 'POST':
            return r.text # vultr handles succss/failure via HTTP Codes, Only GET returns a response.
        return r.json()
nsone.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'X-NSONE-Key': self.options['auth_token']
        }
        default_auth = None

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
easydns.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        query_params['format'] = 'json'
        query_params['_user'] = self.options['auth_username']
        query_params['_key'] = self.options['auth_token']
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
godaddy.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _request(self, action='GET', url='/', data=None, query_params=None):
        if not data:
            data = {}
        if not query_params:
            query_params = {}

        result = requests.request(action, self.api_endpoint + url,
                                  params=query_params,
                                  data=json.dumps(data),
                                  headers={
                                      'Content-Type': 'application/json',
                                      'Accept': 'application/json',
                                      # GoDaddy use a key/secret pair to authenticate
                                      'Authorization': 'sso-key {0}:{1}'.format(
                                          self.options.get('auth_key'),
                                          self.options.get('auth_secret'))
                                  })

        result.raise_for_status()
        return result.json()
glesys.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _request(self, action='GET', url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        query_params['format'] = 'json'
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        credentials = (self.options['auth_username'], self.options['auth_token'])
        response = requests.request(action,
                                    self.api_endpoint + url,
                                    params=query_params,
                                    data=json.dumps(data),
                                    headers=default_headers,
                                    auth=credentials)

        # if the request fails for any reason, throw an error.
        response.raise_for_status()
        return response.json()

    # Adds TTL parameter if passed as argument to lexicon.
cloudns.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _request(self, action='GET', url='/', data=None, query_params=None):
        # Set default values for missing arguments
        data = data if data else {}
        query_params = query_params if query_params else {}

        # Merge authentication data into request
        if action == 'GET':
            query_params.update(self._build_authentication_data())
        else:
            data.update(self._build_authentication_data())

        # Fire request against ClouDNS API and parse result as JSON
        r = requests.request(action, self.api_endpoint + url, params=query_params, data=data)
        r.raise_for_status()
        payload = r.json()

        # Check ClouDNS specific status code and description
        if 'status' in payload and 'statusDescription' in payload and payload['status'] != 'Success':
            raise Exception('ClouDNS API request has failed: ' + payload['statusDescription'])

        # Return payload
        return payload
yandex.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'PddToken': self.options.get('auth_token')
        }

        if not url.startswith(self.api_endpoint):
            url = self.api_endpoint + url

        r = requests.request(action, url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            return r.json()
dnspark.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
        default_auth = (self.options['auth_username'], self.options['auth_token'])

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
linode.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        query_params['api_key'] = self.options.get('auth_token')
        query_params['resultFormat'] = 'JSON'
        query_params['api_action'] = url

        r = requests.request(action, self.api_endpoint, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            result = r.json()
            if len(result['ERRORARRAY']) > 0:
                raise Exception('Linode api error: {0}'.format(result['ERRORARRAY']))
            return result
rage4.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
        }
        default_auth = requests.auth.HTTPBasicAuth(self.options['auth_username'], self.options['auth_token'])

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
zonomi.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_record(self, type, name, content):
        request = {
                'action': 'SET', 
                'type': type, 
                'name': self.options['domain'], 
                'value': content 
        }

        if name is not None:
            request['name'] = self._full_name(name)

        if self.options.get('ttl'):
            request['ttl'] = self.options.get('ttl')

        if self.options.get('priority'):
            request['prio'] = self.options.get('priority')

        payload = self._get('/dns/dyndns.jsp', request)

        if payload.find('is_ok').text != 'OK:':
            raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))

        logger.debug('create_record: %s', True)
        return True
zonomi.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def delete_record(self, identifier=None, type=None, name=None, content=None):
        if identifier is not None:
            type, name, content = self._parse_identifier(identifier)

        request = {
            'action' : 'DELETE',
            'name': self.options['domain'] 
        }

        if type is not None:
            request['type'] = type
        if name is not None:
            request['name'] = self._full_name(name)
        if content is not None:
            request['value'] = content

        payload = self._get('/dns/dyndns.jsp', request)

        if payload.find('is_ok').text != 'OK:':
            raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))

        logger.debug('delete_record: %s', True)
        return True
numerapi.py 文件源码 项目:numerai 作者: gansanay 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
zmirror.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def extract_url_path_and_query(full_url=None, no_query=False):
    """
    Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y

    :param no_query:
    :type full_url: str
    :param full_url: full url
    :return: str
    """
    if full_url is None:
        full_url = request.url
    split = urlsplit(full_url)
    result = split.path or "/"
    if not no_query and split.query:
        result += '?' + split.query
    return result


# ################# End Client Request Handler #################


# ################# Begin Middle Functions #################
auth_web.py 文件源码 项目:AlexaOPi 作者: dony71 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
__init__.py 文件源码 项目:tap-freshdesk 作者: singer-io 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request(url, params=None):
    params = params or {}
    headers = {}
    if 'user_agent' in CONFIG:
        headers['User-Agent'] = CONFIG['user_agent']

    req = requests.Request('GET', url, params=params, auth=(CONFIG['api_key'], ""), headers=headers).prepare()
    logger.info("GET {}".format(req.url))
    resp = session.send(req)

    if 'Retry-After' in resp.headers:
        retry_after = int(resp.headers['Retry-After'])
        logger.info("Rate limit reached. Sleeping for {} seconds".format(retry_after))
        time.sleep(retry_after)
        return request(url, params)

    resp.raise_for_status()

    return resp
auth_web.py 文件源码 项目:AlexaPi 作者: alexa-pi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def index(self):
        sd = json.dumps({
            "alexa:all": {
                "productID": config['alexa']['Device_Type_ID'],
                "productInstanceAttributes": {
                    "deviceSerialNumber": hashlib.sha256(str(uuid.getnode()).encode()).hexdigest()
                }
            }
        })

        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url() + "code"
        payload = {
            "client_id": config['alexa']['Client_ID'],
            "scope": "alexa:all",
            "scope_data": sd,
            "response_type": "code",
            "redirect_uri": callback
        }
        req = requests.Request('GET', url, params=payload)
        prepared_req = req.prepare()
        raise cherrypy.HTTPRedirect(prepared_req.url)
auth_web.py 文件源码 项目:intel-edison-alexa 作者: pedrominatel 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
github_api_demo.py 文件源码 项目:learn_requests 作者: shfanzie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def hard_requests():
    from requests import Request, Session
    s = Session()
    headers = {'User-Agent': 'fake1.3.4'}
    req = Request('GET', build_uri('user/emails'), auth=('shfanzie', '**********'), headers = headers)
    prepped = req.prepare()
    print prepped.body
    print prepped.headers

    try:
        resp = s.send(prepped, timeout=1)
        resp.raise_for_status()
    except exceptions.Timeout as e:
        print e.message
    except exceptions.HTTPError as e:
        print e.message
    else:
        print resp.status_code
        print resp.reason
        print resp.request.headers
        print resp.text
http.py 文件源码 项目:storj-python-sdk 作者: Storj 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _prepare_request(self, **kwargs):
        """Prepares a HTTP request.
        Args:
            kwargs (dict): keyword arguments for the authentication function
                (``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
                :py:class:`requests.Request` class.
        Raises:
            AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
        """

        kwargs.setdefault('headers', {})

        # Add appropriate authentication headers
        if isinstance(self.private_key, SigningKey):
            self._add_ecdsa_signature(kwargs)
        elif self.email and self.password:
            self._add_basic_auth(kwargs)

        # Generate URL from path
        path = kwargs.pop('path')
        assert path.startswith('/')
        kwargs['url'] = urljoin(self.api_url, path)

        return requests.Request(**kwargs).prepare()
__init__.py 文件源码 项目:tap-adwords 作者: singer-io 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_unfiltered_page(sdk_client, fields, start_index, stream):
    service_name = GENERIC_ENDPOINT_MAPPINGS[stream]['service_name']
    service_caller = sdk_client.GetService(service_name, version=VERSION)
    selector = {
        'fields': fields,
        'paging': {
            'startIndex': str(start_index),
            'numberResults': str(PAGE_SIZE)
        }
    }
    with metrics.http_request_timer(stream):
        LOGGER.info("Request %s %s from start_index %s for customer %s",
                    PAGE_SIZE,
                    stream,
                    start_index,
                    sdk_client.client_customer_id)
        page = attempt_get_from_service(service_caller, selector)
        return page
__init__.py 文件源码 项目:tap-adwords 作者: singer-io 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_field_list(stream_schema, stream, stream_metadata):
    #NB> add synthetic keys
    field_list = get_fields_to_sync(stream_schema, stream_metadata)
    LOGGER.info("Request fields: %s", field_list)
    field_list = filter_fields_by_stream_name(stream, field_list)
    LOGGER.info("Filtered fields: %s", field_list)
    # These are munged because of the nature of the API. When you pass
    # the field to the API you need to change its first letter to
    # upper case.
    #
    # See:
    # https://developers.google.com/adwords/api/docs/reference/v201708/AdGroupAdService.AdGroupAd
    # for instance
    field_list = [f[0].upper()+f[1:] for f in field_list]
    LOGGER.info("Munged fields: %s", field_list)
    return field_list


问题


面经


文章

微信
公众号

扫码关注公众号