python类urlencode()的实例源码

plugin.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def view_autocomplete(self, request, group, **kwargs):
        field = request.GET.get('autocomplete_field')
        query = request.GET.get('autocomplete_query')
        if field != 'issue_id' or not query:
            return Response({'issue_id': []})
        query = query.encode('utf-8')
        _url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
        try:
            req = self.make_api_request(group.project, _url)
            body = safe_urlread(req)
        except (requests.RequestException, PluginError) as e:
            return self.handle_api_error(e)

        try:
            json_resp = json.loads(body)

        except ValueError as e:
            return self.handle_api_error(e)

        resp = json_resp.get('stories', {})
        stories = resp.get('stories', [])
        issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]

        return Response({field: issues})
auth.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self):
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Cache-Control": "no-cache",
        }
        body = urlparse.urlencode({
            "client_id": self.api_key,
            "client_secret": self.client_secret,
            "jwt_token": self.jwt_token
        })

        r = requests.post(self.endpoint, headers=headers, data=body)
        if r.status_code != 200:
            raise RuntimeError("Unable to authorize against {}:\n"
                               "Response Code: {:d}, Response Text: {}\n"
                               "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))

        self.set_expiry(r.json()['expires_in'])

        return r.json()['access_token']
model.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
base.py 文件源码 项目:watcher-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _list_request(self, resource, permanent=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'audits'.
        "param **kw: Parameters for the request.
        :return: A tuple with the server response and deserialized JSON list
                 of objects
        """

        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urlparse.urlencode(kwargs)

        resp, body = self.get(uri)
        self.expected_success(200, int(resp['status']))

        return resp, self.deserialize(body)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
github.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def build_login_url(self, redirect_uri, state=None, scope=None):
        if not state:
            state = generate_token()

        params = {
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
facebook.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def build_login_url(self, redirect_uri, state=None, scope=None):
        if not state:
            state = generate_token()

        params = {
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
google.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def build_login_url(self, redirect_uri, state=None, scope='email'):
        if not state:
            state = generate_token()

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
__init__.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _redirect_with_params(url_name, *args, **kwargs):
    """Helper method to create a redirect response with URL params.

    This builds a redirect string that converts kwargs into a
    query string.

    Args:
        url_name: The name of the url to redirect to.
        kwargs: the query string param and their values to build.

    Returns:
        A properly formatted redirect string.
    """
    url = urlresolvers.reverse(url_name, args=args)
    params = parse.urlencode(kwargs, True)
    return "{0}?{1}".format(url, params)
model.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
oauth2.py 文件源码 项目:plugin.audio.spotify 作者: marcelveldt 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app
        """
        payload = {'client_id': self.client_id,
                   'response_type': 'code',
                   'redirect_uri': self.redirect_uri}
        if self.scope:
            payload['scope'] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload['state'] = state

        urlparams = urllibparse.urlencode(payload)

        return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
utils.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
base.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
query.py 文件源码 项目:pynetbox 作者: digitalocean 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_session_key(self):
        """Requests session key

        Issues a GET request to the `get-session-key` endpoint for
        subsequent use in requests from the `secrets` endpoint.

        :Returns: String containing session key.
        """
        req = requests.post(
            '{}/secrets/get-session-key/?preserve_key=True'.format(self.base),
            headers={
                'accept': 'application/json',
                'authorization': 'Token {}'.format(self.token),
                'Content-Type': 'application/x-www-form-urlencoded',
            },
            data=urlencode({
                'private_key': self.private_key.strip('\n')
            })
        )
        if req.ok:
            return json.loads(req.text)['session_key']
        else:
            raise RequestError(req)
base.py 文件源码 项目:ironic-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _list_request(self, resource, permanent=False, headers=None,
                      extra_headers=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'nodes'.
        :param headers: List of headers to use in request.
        :param extra_headers: Specify whether to use headers.
        :param **kwargs: Parameters for the request.
        :returns: A tuple with the server response and deserialized JSON list
                 of objects

        """
        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urllib.urlencode(kwargs)

        resp, body = self.get(uri, headers=headers,
                              extra_headers=extra_headers)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
base.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
utils.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
utils.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
utils.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
utils.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
utils.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
__init__.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _redirect_with_params(url_name, *args, **kwargs):
    """Helper method to create a redirect response with URL params.

    This builds a redirect string that converts kwargs into a
    query string.

    Args:
        url_name: The name of the url to redirect to.
        kwargs: the query string param and their values to build.

    Returns:
        A properly formatted redirect string.
    """
    url = urlresolvers.reverse(url_name, args=args)
    params = parse.urlencode(kwargs, True)
    return "{0}?{1}".format(url, params)
oauth2.py 文件源码 项目:plugin.audio.connectcontrol 作者: NicolasHaeffner 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app
        """
        payload = {'client_id': self.client_id,
                   'response_type': 'code',
                   'redirect_uri': self.redirect_uri}
        if self.scope:
            payload['scope'] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload['state'] = state

        urlparams = urllibparse.urlencode(payload)

        return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
sync.py 文件源码 项目:boartty 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def syncStories(sync, priority, **kw):
    app = sync.app
    params = {}
    for k,v in kw.items():
        if v is not None:
            params[k] = v
    params = urlparse.urlencode(params)
    remote = sync.get('v1/stories?%s' % (params,))

    tasks = []
    for remote_story in remote:
        t = SyncStoryTask(remote_story['id'], remote_story,
                          priority=priority)
        sync.submitTask(t)
        tasks.append(t)
    return tasks
quandl.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _format_metadata_url(self, api_key, page_number):
        """Build the query RL for the quandl WIKI metadata.
        """
        query_params = [
            ('per_page', '100'),
            ('sort_by', 'id'),
            ('page', str(page_number)),
            ('database_code', 'WIKI'),
        ]
        if api_key is not None:
            query_params = [('api_key', api_key)] + query_params

        return (
            'https://www.quandl.com/api/v3/datasets.csv?'
            + urlencode(query_params)
        )
quandl.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _format_wiki_url(self,
                         api_key,
                         symbol,
                         start_date,
                         end_date,
                         data_frequency):
        """
        Build a query URL for a quandl WIKI dataset.
        """
        query_params = [
            ('start_date', start_date.strftime('%Y-%m-%d')),
            ('end_date', end_date.strftime('%Y-%m-%d')),
            ('order', 'asc'),
        ]
        if api_key is not None:
            query_params = [('api_key', api_key)] + query_params

        return (
            "https://www.quandl.com/api/v3/datasets/WIKI/"
            "{symbol}.csv?{query}".format(
                symbol=symbol,
                query=urlencode(query_params),
            )
        )
usher.py 文件源码 项目:script.module.python.twitch 作者: MrSprigster 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def live_request(channel):
    token = channel_token(channel)
    if keys.ERROR in token:
        return token
    else:
        q = UsherQuery('api/channel/hls/{channel}.m3u8')
        q.add_urlkw(keys.CHANNEL, channel)
        q.add_param(keys.SIG, token[keys.SIG])
        q.add_param(keys.TOKEN, token[keys.TOKEN])
        q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
        q.add_param(keys.ALLOW_SPECTRE, Boolean.TRUE)
        q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
        url = '?'.join([q.url, urlencode(q.params)])
        request_dict = {'url': url, 'headers': q.headers}
        log.debug('live_request: |{0}|'.format(str(request_dict)))
        return request_dict
usher.py 文件源码 项目:script.module.python.twitch 作者: MrSprigster 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def video_request(video_id):
    video_id = valid_video_id(video_id)
    if video_id:
        token = vod_token(video_id)
        if keys.ERROR in token:
            return token
        else:
            q = UsherQuery('vod/{id}')
            q.add_urlkw(keys.ID, video_id)
            q.add_param(keys.NAUTHSIG, token[keys.SIG])
            q.add_param(keys.NAUTH, token[keys.TOKEN])
            q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
            q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
            url = '?'.join([q.url, urlencode(q.params)])
            request_dict = {'url': url, 'headers': q.headers}
            log.debug('video_request: |{0}|'.format(str(request_dict)))
            return request_dict
    else:
        raise NotImplementedError('Unknown Video Type')
test_datastore.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_get_transfer_ticket(self):
        dc_path = 'datacenter-1'
        ds_name = 'datastore-1'
        params = {'dcPath': dc_path, 'dsName': ds_name}
        query = urlparse.urlencode(params)
        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
        session = mock.Mock()
        session.invoke_api = mock.Mock()

        class Ticket(object):
            id = 'fake_id'
        session.invoke_api.return_value = Ticket()
        ds_url = datastore.DatastoreURL.urlparse(url)
        ticket = ds_url.get_transfer_ticket(session, 'PUT')
        self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'),
                         ticket)


问题


面经


文章

微信
公众号

扫码关注公众号