def view_autocomplete(self, request, group, **kwargs):
field = request.GET.get('autocomplete_field')
query = request.GET.get('autocomplete_query')
if field != 'issue_id' or not query:
return Response({'issue_id': []})
query = query.encode('utf-8')
_url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
try:
req = self.make_api_request(group.project, _url)
body = safe_urlread(req)
except (requests.RequestException, PluginError) as e:
return self.handle_api_error(e)
try:
json_resp = json.loads(body)
except ValueError as e:
return self.handle_api_error(e)
resp = json_resp.get('stories', {})
stories = resp.get('stories', [])
issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]
return Response({field: issues})
python类urlencode()的实例源码
def __call__(self):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Cache-Control": "no-cache",
}
body = urlparse.urlencode({
"client_id": self.api_key,
"client_secret": self.client_secret,
"jwt_token": self.jwt_token
})
r = requests.post(self.endpoint, headers=headers, data=body)
if r.status_code != 200:
raise RuntimeError("Unable to authorize against {}:\n"
"Response Code: {:d}, Response Text: {}\n"
"Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))
self.set_expiry(r.json()['expires_in'])
return r.json()['access_token']
def _build_query(self, params):
"""Builds a query string.
Args:
params: dict, the query parameters
Returns:
The query parameters properly encoded into an HTTP URI query string.
"""
if self.alt_param is not None:
params.update({'alt': self.alt_param})
astuples = []
for key, value in six.iteritems(params):
if type(value) == type([]):
for x in value:
x = x.encode('utf-8')
astuples.append((key, x))
else:
if isinstance(value, six.text_type) and callable(value.encode):
value = value.encode('utf-8')
astuples.append((key, value))
return '?' + urlencode(astuples)
def _list_request(self, resource, permanent=False, **kwargs):
"""Get the list of objects of the specified type.
:param resource: The name of the REST resource, e.g., 'audits'.
"param **kw: Parameters for the request.
:return: A tuple with the server response and deserialized JSON list
of objects
"""
uri = self._get_uri(resource, permanent=permanent)
if kwargs:
uri += "?%s" % urlparse.urlencode(kwargs)
resp, body = self.get(uri)
self.expected_success(200, int(resp['status']))
return resp, self.deserialize(body)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def build_login_url(self, redirect_uri, state=None, scope=None):
if not state:
state = generate_token()
params = {
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'state': state,
}
if scope:
params['scope'] = scope
login_url = OAUTH_DIALOG_URL.format(urlencode(params))
return login_url, state
def build_login_url(self, redirect_uri, state=None, scope=None):
if not state:
state = generate_token()
params = {
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'state': state,
}
if scope:
params['scope'] = scope
login_url = OAUTH_DIALOG_URL.format(urlencode(params))
return login_url, state
def build_login_url(self, redirect_uri, state=None, scope='email'):
if not state:
state = generate_token()
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'state': state,
}
if scope:
params['scope'] = scope
login_url = OAUTH_DIALOG_URL.format(urlencode(params))
return login_url, state
def _redirect_with_params(url_name, *args, **kwargs):
"""Helper method to create a redirect response with URL params.
This builds a redirect string that converts kwargs into a
query string.
Args:
url_name: The name of the url to redirect to.
kwargs: the query string param and their values to build.
Returns:
A properly formatted redirect string.
"""
url = urlresolvers.reverse(url_name, args=args)
params = parse.urlencode(kwargs, True)
return "{0}?{1}".format(url, params)
def _build_query(self, params):
"""Builds a query string.
Args:
params: dict, the query parameters
Returns:
The query parameters properly encoded into an HTTP URI query string.
"""
if self.alt_param is not None:
params.update({'alt': self.alt_param})
astuples = []
for key, value in six.iteritems(params):
if type(value) == type([]):
for x in value:
x = x.encode('utf-8')
astuples.append((key, x))
else:
if isinstance(value, six.text_type) and callable(value.encode):
value = value.encode('utf-8')
astuples.append((key, value))
return '?' + urlencode(astuples)
def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app
"""
payload = {'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.redirect_uri}
if self.scope:
payload['scope'] = self.scope
if state is None:
state = self.state
if state is not None:
payload['state'] = state
urlparams = urllibparse.urlencode(payload)
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
def sort_url_by_query_keys(url):
"""A helper function which sorts the keys of the query string of a url.
For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
prevent non-deterministic ordering of the query string causing
problems with unit tests.
:param url: url which will be ordered by query keys
:returns url: url with ordered query keys
"""
parsed = urlparse.urlparse(url)
queries = urlparse.parse_qsl(parsed.query, True)
sorted_query = sorted(queries, key=lambda x: x[0])
encoded_sorted_query = urlparse.urlencode(sorted_query, True)
url_parts = (parsed.scheme, parsed.netloc, parsed.path,
parsed.params, encoded_sorted_query,
parsed.fragment)
return urlparse.urlunparse(url_parts)
def find(self, base_url=None, **kwargs):
"""Find a single item with attributes matching ``**kwargs``.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
rl = self._list(
'%(base_url)s%(query)s' % {
'base_url': self.build_url(base_url=base_url, **kwargs),
'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
},
self.collection_key)
num = len(rl)
if num == 0:
msg = _("No %(name)s matching %(args)s.") % {
'name': self.resource_class.__name__,
'args': kwargs
}
raise exceptions.NotFound(msg)
elif num > 1:
raise exceptions.NoUniqueMatch
else:
return rl[0]
def get_session_key(self):
"""Requests session key
Issues a GET request to the `get-session-key` endpoint for
subsequent use in requests from the `secrets` endpoint.
:Returns: String containing session key.
"""
req = requests.post(
'{}/secrets/get-session-key/?preserve_key=True'.format(self.base),
headers={
'accept': 'application/json',
'authorization': 'Token {}'.format(self.token),
'Content-Type': 'application/x-www-form-urlencoded',
},
data=urlencode({
'private_key': self.private_key.strip('\n')
})
)
if req.ok:
return json.loads(req.text)['session_key']
else:
raise RequestError(req)
def _list_request(self, resource, permanent=False, headers=None,
extra_headers=False, **kwargs):
"""Get the list of objects of the specified type.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param headers: List of headers to use in request.
:param extra_headers: Specify whether to use headers.
:param **kwargs: Parameters for the request.
:returns: A tuple with the server response and deserialized JSON list
of objects
"""
uri = self._get_uri(resource, permanent=permanent)
if kwargs:
uri += "?%s" % urllib.urlencode(kwargs)
resp, body = self.get(uri, headers=headers,
extra_headers=extra_headers)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
def find(self, base_url=None, **kwargs):
"""Find a single item with attributes matching ``**kwargs``.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
rl = self._list(
'%(base_url)s%(query)s' % {
'base_url': self.build_url(base_url=base_url, **kwargs),
'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
},
self.collection_key)
num = len(rl)
if num == 0:
msg = _("No %(name)s matching %(args)s.") % {
'name': self.resource_class.__name__,
'args': kwargs
}
raise exceptions.NotFound(404, msg)
elif num > 1:
raise exceptions.NoUniqueMatch
else:
return rl[0]
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _redirect_with_params(url_name, *args, **kwargs):
"""Helper method to create a redirect response with URL params.
This builds a redirect string that converts kwargs into a
query string.
Args:
url_name: The name of the url to redirect to.
kwargs: the query string param and their values to build.
Returns:
A properly formatted redirect string.
"""
url = urlresolvers.reverse(url_name, args=args)
params = parse.urlencode(kwargs, True)
return "{0}?{1}".format(url, params)
def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app
"""
payload = {'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.redirect_uri}
if self.scope:
payload['scope'] = self.scope
if state is None:
state = self.state
if state is not None:
payload['state'] = state
urlparams = urllibparse.urlencode(payload)
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
def syncStories(sync, priority, **kw):
app = sync.app
params = {}
for k,v in kw.items():
if v is not None:
params[k] = v
params = urlparse.urlencode(params)
remote = sync.get('v1/stories?%s' % (params,))
tasks = []
for remote_story in remote:
t = SyncStoryTask(remote_story['id'], remote_story,
priority=priority)
sync.submitTask(t)
tasks.append(t)
return tasks
def _format_metadata_url(self, api_key, page_number):
"""Build the query RL for the quandl WIKI metadata.
"""
query_params = [
('per_page', '100'),
('sort_by', 'id'),
('page', str(page_number)),
('database_code', 'WIKI'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
return (
'https://www.quandl.com/api/v3/datasets.csv?'
+ urlencode(query_params)
)
def _format_wiki_url(self,
api_key,
symbol,
start_date,
end_date,
data_frequency):
"""
Build a query URL for a quandl WIKI dataset.
"""
query_params = [
('start_date', start_date.strftime('%Y-%m-%d')),
('end_date', end_date.strftime('%Y-%m-%d')),
('order', 'asc'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
return (
"https://www.quandl.com/api/v3/datasets/WIKI/"
"{symbol}.csv?{query}".format(
symbol=symbol,
query=urlencode(query_params),
)
)
def live_request(channel):
token = channel_token(channel)
if keys.ERROR in token:
return token
else:
q = UsherQuery('api/channel/hls/{channel}.m3u8')
q.add_urlkw(keys.CHANNEL, channel)
q.add_param(keys.SIG, token[keys.SIG])
q.add_param(keys.TOKEN, token[keys.TOKEN])
q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
q.add_param(keys.ALLOW_SPECTRE, Boolean.TRUE)
q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
url = '?'.join([q.url, urlencode(q.params)])
request_dict = {'url': url, 'headers': q.headers}
log.debug('live_request: |{0}|'.format(str(request_dict)))
return request_dict
def video_request(video_id):
video_id = valid_video_id(video_id)
if video_id:
token = vod_token(video_id)
if keys.ERROR in token:
return token
else:
q = UsherQuery('vod/{id}')
q.add_urlkw(keys.ID, video_id)
q.add_param(keys.NAUTHSIG, token[keys.SIG])
q.add_param(keys.NAUTH, token[keys.TOKEN])
q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
url = '?'.join([q.url, urlencode(q.params)])
request_dict = {'url': url, 'headers': q.headers}
log.debug('video_request: |{0}|'.format(str(request_dict)))
return request_dict
else:
raise NotImplementedError('Unknown Video Type')
def test_get_transfer_ticket(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
session = mock.Mock()
session.invoke_api = mock.Mock()
class Ticket(object):
id = 'fake_id'
session.invoke_api.return_value = Ticket()
ds_url = datastore.DatastoreURL.urlparse(url)
ticket = ds_url.get_transfer_ticket(session, 'PUT')
self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'),
ticket)