python类post()的实例源码

send_messages.py 文件源码 项目:BackendAllStars 作者: belatrix 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def send_message_android(destination, message):
    headers = {
        'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
        'Content - Type': 'application/json'
    }
    payload = {
        "to": destination,
        "data": {
            "title": config.TITLE_PUSH_NOTIFICATION,
            "detail": message
        }
    }
    request = requests.post(
        settings.FIREBASE_API_URL,
        json=payload,
        headers=headers
    )
    print(request.text)
send_messages.py 文件源码 项目:BackendAllStars 作者: belatrix 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def send_message_ios(destination, message):
    headers = {
        'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
        'Content - Type': 'application/json'
    }
    payload = {
        "to": destination,
        "priority": "high",
        "badge": 0,
        "notification": {
            "title": config.TITLE_PUSH_NOTIFICATION,
            "text": message,
            "sound": "default",
        }
    }
    request = requests.post(
        settings.FIREBASE_API_URL,
        json=payload,
        headers=headers
    )
    print(request.text)
auth_server.py 文件源码 项目:cbas 作者: ImmobilienScout24 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def obtain_access_token(config, password):
    info("Will now attempt to obtain an JWT...")
    auth_request_data = {'client_id': 'jumpauth',
                         'username': config.username,
                         'password': password,
                         'grant_type': 'password'}
    auth_url = get_auth_url(config)
    auth_response = requests.post(auth_url, auth_request_data)

    if auth_response.status_code not in [200, 201]:
        log.info("Authentication failed: {0}".
                 format(auth_response.json().get("error")))
        auth_response.raise_for_status()
    else:
        log.info("Authentication OK!")

    # TODO bail out if there was no access token in the answer
    access_token = auth_response.json()['access_token']
    info("Access token was received.")
    verbose("Access token is:\n'{0}'".format(access_token))
    return access_token
snk.py 文件源码 项目:Netkeeper 作者: 1941474711 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def authenticate(username, password):
    global Cookies, Host
    try:
        postData = "password=" + getPasswordEnc(password) + "&clientType=android&username=" + \
                   username + "&key=" + getSessionEnc(AccessToken) + "&code=8&clientip=" + LocalIpAddress
        header['Content-Length'] = '219'
        url = 'http://' + Host + '/wf.do'
        resp = requests.post(url=url, headers=header, cookies=Cookies, data=postData)
        resp.encoding = "GBK"
        print resp.request.headers
        text = resp.text
        print text
        if 'auth00' in text > 0:
            print "????."
            return True
        else:
            print "????[" + getErrorMsg(resp) + "]"
            return False
    except Exception as e:
        print e
        print "(6/10) ????????"
        return False
auth.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __call__(self):
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Cache-Control": "no-cache",
        }
        body = urlparse.urlencode({
            "client_id": self.api_key,
            "client_secret": self.client_secret,
            "jwt_token": self.jwt_token
        })

        r = requests.post(self.endpoint, headers=headers, data=body)
        if r.status_code != 200:
            raise RuntimeError("Unable to authorize against {}:\n"
                               "Response Code: {:d}, Response Text: {}\n"
                               "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))

        self.set_expiry(r.json()['expires_in'])

        return r.json()['access_token']
tasks.py 文件源码 项目:django-zerodowntime 作者: rentlytics 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def nightly_build(ctx, project_name, branch_name, circle_token):
    url = 'https://circleci.com/api/v1/project/{}/tree/{}?circle-token={}'.format(
        project_name,
        branch_name,
        circle_token
    )

    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
    }

    body = {
        'build_parameters': {
            'RUN_NIGHTLY_BUILD': "true",
        }
    }

    response = requests.post(url, json=body, headers=headers)

    print("Nightly Build queued on CircleCI: http_status={}, response={}".format(
        response.status_code,
        response.json()
    ))
jobs.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def start_job(self, job_id):
        """
        Request to start a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already started'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"start": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
jobs.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop_job(self, job_id):
        """
        Request to stop a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"stop": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
jobs.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def abort_job(self, job_id):
        """
        Request to abort a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"abort": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
sessions.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def end_session(self, session_id, job_id, session_tag, result):
        """
        Informs the freezer service that the job has ended.
        Provides information about the job's result and the session tag

        :param session_id:
        :param job_id:
        :param session_tag:
        :param result:
        :return:
        """
        # endpoint /v1/sessions/{sessions_id}/action
        endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
        doc = {"end": {
            "job_id": job_id,
            "current_tag": session_tag,
            "result": result
        }}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
jobs.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def stop_job(self, job_id):
        """
        Request to stop a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"stop": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
jobs.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def abort_job(self, job_id):
        """
        Request to abort a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"abort": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
sessions.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def end_session(self, session_id, job_id, session_tag, result):
        """
        Informs the freezer service that the job has ended.
        Provides information about the job's result and the session tag

        :param session_id:
        :param job_id:
        :param session_tag:
        :param result:
        :return:
        """
        # endpoint /v1/sessions/{sessions_id}/action
        endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
        doc = {"end": {
            "job_id": job_id,
            "current_tag": session_tag,
            "result": result
        }}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
slack.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def send(notif_type, **params):
    ''' Send slack notifications. '''
    url = config()['base_url'] + config()['endpoint']

    (text, color) = notification.get(
        notif_type,
        config=get_config(),
        notif_config=config(),
        create_link=create_link,
        **params
    )

    payload = {
        'attachments': [
            {
                'color': color,
                'text': text
            }
        ]
    }

    requests.post(url, json=payload)
hipchat.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(notif_type, **params):
    ''' Send hipchat notifications. '''

    url = API_BASE_URL.format(
        company_name=config()['company_name'],
        room_id=config()['room_id'],
        auth_token=config()['auth_token']
    )

    (text, color) = notification.get(
        notif_type,
        config=get_config(),
        notif_config=config(),
        create_link=create_link,
        **params
    )

    payload = {
        'color': color,
        'message': text,
        'notify': config()['notify'],
        'message_format': 'html'
    }

    requests.post(url, json=payload)
api.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def create_logset(logset_name=None, params=None):
    """
    Add a new logset to the current account.
    If a filename is given, load the contents of the file
    as json parameters for the request.
    If a name is given, create a new logset with the given name
    """
    if params is not None:
        request_params = params
    else:
        request_params = {
            'logset': {
                'name': logset_name
            }
        }

    headers = api_utils.generate_headers('rw')

    try:
        response = requests.post(_url()[1], json=request_params, headers=headers)
        handle_response(response, 'Creating logset failed.\n', 201)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
api.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def create(payload):
    """
    Create an api key with the provided ID
    """
    action, url = _url()

    headers = api_utils.generate_headers('owner', method='POST', body=json.dumps(payload),
                                         action=action)

    try:
        response = requests.post(url, headers=headers, json=payload)
        if response_utils.response_error(response):
            sys.stderr.write('Create api key failed.')
            sys.exit(1)
        elif response.status_code == 201:
            handle_api_key_response(response)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
requests.py 文件源码 项目:gql 作者: graphql-python 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def execute(self, document, variable_values=None, timeout=None):
        query_str = print_ast(document)
        payload = {
            'query': query_str,
            'variables': variable_values or {}
        }

        data_key = 'json' if self.use_json else 'data'
        post_args = {
            'headers': self.headers,
            'auth': self.auth,
            'cookies': self.cookies,
            'timeout': timeout or self.default_timeout,
            data_key: payload
        }
        request = requests.post(self.url, **post_args)
        request.raise_for_status()

        result = request.json()
        assert 'errors' in result or 'data' in result, 'Received non-compatible response "{}"'.format(result)
        return ExecutionResult(
            errors=result.get('errors'),
            data=result.get('data')
        )
spotify.py 文件源码 项目:spotify-connect-scrobbler 作者: jeschkies 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def refresh_access_token(self, refresh_token):
        """ Returns the access token for Spotify Web API using a refresh token.

        Args:
            refresh_token (string): The token has been returned by the access
            token request.

        Returns:
            SpotifyCredentials: The parsed response from Spotify.
        """
        # Get new auth token
        payload = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token
        }
        headers = self._make_authorization_headers()
        return requests.post(
            'https://accounts.spotify.com/api/token',
            data=payload,
            headers=headers
        ).json()
lastfm.py 文件源码 项目:spotify-connect-scrobbler 作者: jeschkies 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def request_access_token(self, token):
        """ Request access token from Last.fm.

        Args:
            token (string): Token from redirect.

        Return:
            dict: Response from get session call.
        """
        payload = {
            'api_key': self.__key,
            'method': 'auth.getSession',
            'token': token
        }
        payload['api_sig'] = self.sign(payload)
        payload['format'] = 'json'
        response = requests.post(
            'https://ws.audioscrobbler.com/2.0/', params=payload).json()

        return LastfmCredentials(response['session']['key'])


问题


面经


文章

微信
公众号

扫码关注公众号