def list_project_members(self, repo):
return self.request(
'GET',
'/projects/{}/members'.format(quote(repo, safe='')),
)
python类quote()的实例源码
def query_single(self, object_type, url_params, query_params=None):
# type: (str, list, dict) -> dict
"""
Query for a single object.
:param object_type: string query type (e.g., "users" or "groups")
:param url_params: required list of strings to provide as additional URL components
:param query_params: optional dictionary of query options
:return: the found object (a dictionary), which is empty if none were found
"""
# Server API convention (v2) is that the pluralized object type goes into the endpoint
# but the object type is the key in the response dictionary for the returned object.
self.local_status["single-query-count"] += 1
query_type = object_type + "s" # poor man's plural
query_path = "/organizations/{}/{}".format(self.org_id, query_type)
for component in url_params if url_params else []:
query_path += "/" + urlparse.quote(component, safe='/@')
if query_params: query_path += "?" + urlparse.urlencode(query_params)
try:
result = self.make_call(query_path)
body = result.json()
except RequestError as re:
if re.result.status_code == 404:
if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)",
object_type, url_params, query_params)
return {}
else:
raise re
if body.get("result") == "success":
value = body.get(object_type, {})
if self.logger: self.logger.debug("Ran %s query: %s %s (1 found)", object_type, url_params, query_params)
return value
else:
raise ClientError("OK status but no 'success' result", result)
def query_multiple(self, object_type, page=0, url_params=None, query_params=None):
# type: (str, int, list, dict) -> tuple
"""
Query for a page of objects. Defaults to the (0-based) first page.
Sadly, the sort order is undetermined.
:param object_type: string query type (e.g., "users" or "groups")
:param page: numeric page (0-based) of results to get (up to 200 in a page)
:param url_params: optional list of strings to provide as additional URL components
:param query_params: optional dictionary of query options
:return: tuple (list of returned dictionaries (one for each query result), bool for whether this is last page)
"""
# Server API convention (v2) is that the pluralized object type goes into the endpoint
# and is also the key in the response dictionary for the returned objects.
self.local_status["multiple-query-count"] += 1
query_type = object_type + "s" # poor man's plural
query_path = "/{}/{}/{:d}".format(query_type, self.org_id, page)
for component in url_params if url_params else []:
query_path += "/" + urlparse.quote(component)
if query_params: query_path += "?" + urlparse.urlencode(query_params)
try:
result = self.make_call(query_path)
body = result.json()
except RequestError as re:
if re.result.status_code == 404:
if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)",
object_type, url_params, query_params)
return [], True
else:
raise re
if body.get("result") == "success":
values = body.get(query_type, [])
last_page = body.get("lastPage", False)
if self.logger: self.logger.debug("Ran multi-%s query: %s %s (page %d: %d found)",
object_type, url_params, query_params, page, len(values))
return values, last_page
else:
raise ClientError("OK status but no 'success' result", result)
def __extract_metadata(self, doc, payload):
filename = os.path.basename(doc.path)
headers = {
'Accept': 'application/json',
'Content-Disposition': 'attachment; filename=%s' % url_quote(filename)
}
if doc.meta['Content-Type']:
headers['Content-Type'] = doc.meta['Content-Type']
tika_url = self.config.get(helper.TIKA_META)
connection = self.config[helper.INJECTOR].get_http_connection(tika_url)
payload.seek(0)
connection.request('PUT', '/meta', payload.read(), headers)
payload.seek(0)
response = connection.getresponse()
try:
if response.status >= 400:
logging.error('tika error %d (%s): %s', response.status,
response.reason, doc.path)
return {}
response_data = response.read()
finally:
response.close()
try:
result = json.loads(response_data.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
logging.error('invalid response from tika for %s', doc.path)
result = {}
return result
def _id_to_header(self, id_):
"""Convert an id to a Content-ID header value.
Args:
id_: string, identifier of individual request.
Returns:
A Content-ID header with the id_ encoded into it. A UUID is prepended to
the value because Content-ID headers are supposed to be universally
unique.
"""
if self._base_id is None:
self._base_id = uuid.uuid4()
return '<%s+%s>' % (self._base_id, quote(id_))
def encode_params(obj, keys=()):
if isinstance(obj, dict):
# key[bar]=1&key[baz]=2
return "&".join(encode_params(val, keys + (key,)) for key, val in obj.items())
# key[foo][]=1&key[foo][]=2
elif isinstance(obj, (list, tuple)):
return "&".join(encode_params(val, keys + ("",)) for val in obj)
# `obj` is just a value, key=1
else:
encoded_keys = ""
for depth, key in enumerate(keys):
# All keys but top-level keys should be in brackets, i.e.
# `key[foo]=1`, not `[key][foo]=1`
encoded_keys += key if depth == 0 else "[" + key + "]"
return quote(encoded_keys) + "=" + quote(obj)
def swift_get_container(request, container_name, with_data=True):
if with_data:
headers, data = swift_api(request).get_object(container_name, "")
else:
data = None
headers = swift_api(request).head_container(container_name)
timestamp = None
is_public = False
public_url = None
try:
is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '')
if is_public:
swift_endpoint = base.url_for(request,
'object-store',
endpoint_type='publicURL')
parameters = urlparse.quote(container_name.encode('utf8'))
public_url = swift_endpoint + '/' + parameters
ts_float = float(headers.get('x-timestamp'))
timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
except Exception:
pass
container_info = {
'name': container_name,
'container_object_count': headers.get('x-container-object-count'),
'container_bytes_used': headers.get('x-container-bytes-used'),
'timestamp': timestamp,
'data': data,
'is_public': is_public,
'public_url': public_url,
}
return Container(container_info)
def get_pagination_options(limit=None, marker=None, sorts=None):
options = []
if limit:
options.append("limit=%d" % limit)
if marker:
options.append("marker=%s" % urllib_parse.quote(marker))
for sort in sorts or []:
options.append("sort=%s" % urllib_parse.quote(sort))
return "&".join(options)
def oauth2_authorize(request):
""" View to start the OAuth2 Authorization flow.
This view starts the OAuth2 authorization flow. If scopes is passed in
as a GET URL parameter, it will authorize those scopes, otherwise the
default scopes specified in settings. The return_url can also be
specified as a GET parameter, otherwise the referer header will be
checked, and if that isn't found it will return to the root path.
Args:
request: The Django request object.
Returns:
A redirect to Google OAuth2 Authorization.
"""
return_url = request.GET.get('return_url', None)
if not return_url:
return_url = request.META.get('HTTP_REFERER', '/')
scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes)
# Model storage (but not session storage) requires a logged in user
if django_util.oauth2_settings.storage_model:
if not request.user.is_authenticated():
return redirect('{0}?next={1}'.format(
settings.LOGIN_URL, parse.quote(request.get_full_path())))
# This checks for the case where we ended up here because of a logged
# out user but we had credentials for it in the first place
else:
user_oauth = django_util.UserOAuth2(request, scopes, return_url)
if user_oauth.has_credentials():
return redirect(return_url)
flow = _make_flow(request=request, scopes=scopes, return_url=return_url)
auth_url = flow.step1_get_authorize_url()
return shortcuts.redirect(auth_url)
def _id_to_header(self, id_):
"""Convert an id to a Content-ID header value.
Args:
id_: string, identifier of individual request.
Returns:
A Content-ID header with the id_ encoded into it. A UUID is prepended to
the value because Content-ID headers are supposed to be universally
unique.
"""
if self._base_id is None:
self._base_id = uuid.uuid4()
return '<%s+%s>' % (self._base_id, quote(id_))
def quote_key(key, reverse=False):
"""Prepare key for storage data in MongoDB.
:param key: key that should be quoted
:param reverse: boolean, True --- if we need a reverse order of the keys
parts
:return: iter of quoted part of the key
"""
r = -1 if reverse else 1
for k in key.split('.')[::r]:
if k.startswith('$'):
k = parse.quote(k)
yield k
def improve_keys(data, metaquery=False):
"""Improves keys in dict if they contained '.' or started with '$'.
:param data: is a dictionary where keys need to be checked and improved
:param metaquery: boolean, if True dots are not escaped from the keys
:return: improved dictionary if keys contained dots or started with '$':
{'a.b': 'v'} -> {'a': {'b': 'v'}}
{'$ab': 'v'} -> {'%24ab': 'v'}
"""
if not isinstance(data, dict):
return data
if metaquery:
for key in six.iterkeys(data):
if '.$' in key:
key_list = []
for k in quote_key(key):
key_list.append(k)
new_key = '.'.join(key_list)
data[new_key] = data.pop(key)
else:
for key, value in data.items():
if isinstance(value, dict):
improve_keys(value)
if '.' in key:
new_dict = {}
for k in quote_key(key, reverse=True):
new = {}
new[k] = new_dict if new_dict else data.pop(key)
new_dict = new
data.update(new_dict)
else:
if key.startswith('$'):
new_key = parse.quote(key)
data[new_key] = data.pop(key)
return data
def get(self, user_id, return_request_id=None):
"""Get the details for a specific user.
:param user_id: ID of the user
:param return_request_id: If an empty list is provided, populate this
list with the request ID value from the header
x-openstack-request-id
"""
resp, body = self.client.get('/users/%s' % parse.quote(str(user_id)))
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
data = body.get('user')
return self.resource_class(self, data, loaded=True)
def action(self, user_id, **kwargs):
"""Perform specified action on user.
:param user_id: ID of the user
"""
url = '/users/%s/action' % parse.quote(str(user_id))
return_request_id = kwargs.pop('return_req_id', None)
resp, body = self.client.post(url, data=kwargs)
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
return self.resource_class(self, body.get('user'), loaded=True)
def get(self, policy_id):
"""Get a specific policy."""
url = '/policies/%s' % parse.quote(str(policy_id))
resq, body = self.client.get(url)
return self.resource_class(self, body.get('policy'), loaded=True)
def action(self, policy_id, **kwargs):
"""Perform specified action on a policy."""
url = '/policies/%s/action' % parse.quote(str(policy_id))
resq, body = self.client.post(url, data=kwargs)
return self.resource_class(self, body.get('policy'), loaded=True)
def delete(self, policy_id):
"""Delete a specific policy."""
return self._delete('/policies/%s' % parse.quote(str(policy_id)))
def delete(self, rule_id):
"""Delete a specific rule.
:param rule_id: Id of the rule to delete
"""
return self._delete('/rules/%s' % parse.quote(str(rule_id)))