def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
python类quote()的实例源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def encode(data, mime_type='', charset='utf-8', base64=True):
"""
Encode data to DataURL
"""
if isinstance(data, six.text_type):
data = data.encode(charset)
else:
charset = None
if base64:
data = utils.text(b64encode(data))
else:
data = utils.text(quote(data))
result = ['data:', ]
if mime_type:
result.append(mime_type)
if charset:
result.append(';charset=')
result.append(charset)
if base64:
result.append(';base64')
result.append(',')
result.append(data)
return ''.join(result)
def test_encoding_rdf():
# With encoding specified
encoding = "ISO-8859-1"
csvw = CSVW(csv_path="./tests/iso_encoding.csv",
metadata_path="./tests/iso_encoding.csv-metadata.json",
csv_encoding=encoding)
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
units = Namespace('http://example.org/units/')
cars = Namespace('http://example.org/cars/')
meta = Namespace("http://example.org/properties/")
expected_unit = units[quote(u"\xb5100".encode('utf-8'))]
assert (cars['1'], meta['UnitOfMeasurement'], expected_unit) in g
assert expected_unit in list(g.objects())
def encode_string(value, double=False):
"""
Url encode a string to ASCII in order to escape any characters not
allowed :, /, ?, #, &, =. If parameter 'double=True' perform two passes
of encoding which may be required for some REST api endpoints. This is
usually done to remove any additional special characters produce by the
single encoding pass.
:param value: Value to encode
:param double: Double encode string
:return:
"""
# Replace special characters in string using the %xx escape
encoded_str = quote(value, '')
if double: # double encode
encoded_str = quote(encoded_str, '')
return encoded_str
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def get(self, stack_id, resource_name, with_attr=None):
"""Get the details for a specific resource.
:param stack_id: ID of stack containing the resource
:param resource_name: ID of resource to get the details for
:param with_attr: Attributes to show
"""
stack_id = self._resolve_stack_id(stack_id)
url_str = '/stacks/%s/resources/%s' % (
parse.quote(stack_id, ''),
parse.quote(encodeutils.safe_encode(resource_name), ''))
if with_attr:
params = {'with_attr': with_attr}
url_str += '?%s' % parse.urlencode(params, True)
resp = self.client.get(url_str)
body = utils.get_response_body(resp)
return Resource(self, body.get('resource'))
def mark_unhealthy(self, stack_id, resource_name,
mark_unhealthy, resource_status_reason):
"""Mark a resource as healthy or unhealthy.
:param stack_id: ID of stack containing the resource
:param resource_name: ID of resource
:param mark_unhealthy: Mark resource unhealthy if set to True
:param resource_status_reason: Reason for resource status change.
"""
stack_id = self._resolve_stack_id(stack_id)
url_str = '/stacks/%s/resources/%s' % (
parse.quote(stack_id, ''),
parse.quote(encodeutils.safe_encode(resource_name), ''))
resp = self.client.patch(
url_str,
data={"mark_unhealthy": mark_unhealthy,
"resource_status_reason": resource_status_reason})
body = utils.get_response_body(resp)
return body
def _send_update(self, url, attribute_map, attribute_calculations, record):
d = record.date.replace(tzinfo=self.station_time_zone).astimezone(pytz.UTC).replace(tzinfo=None)
url = '%s&ID=%s&PASSWORD=%s&dateutc=%s' % (
url,
self.station_id,
url_quote(self.password, safe=''),
url_quote(d.strftime('%Y-%m-%d %H:%M:%S'), safe=''),
)
for parameter, field in attribute_map or []:
if field and record[field] is not None:
url += '&%s=%s' % (parameter, url_quote(record[field], safe=''), )
for parameter, function in attribute_calculations or []:
if function:
value = function(self, record)
if value is not None:
url += '&%s=%s' % (parameter, url_quote(value, safe=''), )
response = self._session.get(url)
assert 200 <= response.status_code < 300, 'Status code %s unexpected' % response.status_code
assert response.text == 'success', 'Response "%s" unexpected' % response.text
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def code(item):
"""
Turn a NameID class instance into a quoted string of comma separated
attribute,value pairs. The attribute names are replaced with digits.
Depends on knowledge on the specific order of the attributes for the
class that is used.
:param item: The class instance
:return: A quoted string
"""
_res = []
i = 0
for attr in ATTR:
val = getattr(item, attr)
if val:
_res.append("%d=%s" % (i, quote(val)))
i += 1
return ",".join(_res)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def check_connection(default='http://google.com', timeout=1):
"""Test the internet connection.
Parameters
----------
default : str
URL to test; defaults to a Google IP address.
timeout : number
Time in seconds to wait before giving up.
Returns
-------
success : bool
True if appears to be online, else False
"""
success = True
try:
surl = urlparse.quote(default, safe=':./')
urlrequest.urlopen(surl, timeout=timeout)
except urlerror.URLError as derp:
success = False
logger.debug("Network unreachable: {}".format(derp))
return success
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
uriparts = {
"type": type_name,
"label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
else quote(account_name)),
"parameters": urlencode(parameters),
}
return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def get_local_path(self, img, pfmri):
"""Return an opener for the license text from the local disk or
None if the data for the text is not on-disk."""
if img.version <= 3:
# Older images stored licenses without accounting for
# '/', spaces, etc. properly.
path = os.path.join(img.get_license_dir(pfmri),
"license." + self.attrs["license"])
else:
# Newer images ensure licenses are stored with encoded
# name so that '/', spaces, etc. are properly handled.
path = os.path.join(img.get_license_dir(pfmri),
"license." + quote(self.attrs["license"],
""))
return path
def abandon_0(self, *tokens):
"""Aborts an in-flight transaction for the Transaction ID
specified in the request path. Returns no output."""
try:
# cherrypy decoded it, but we actually need it encoded.
trans_id = quote(tokens[0], "")
except IndexError:
trans_id = None
try:
self.repo.abandon(trans_id)
except srepo.RepositoryError as e:
# Assume a bad request was made. A 404 can't be
# returned here as misc.versioned_urlopen will interpret
# that to mean that the server doesn't support this
# operation.
raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
def manifest(self, *tokens):
"""Manifest requests coming from the BUI need to be redirected
back through the RewriteRules defined in the Apache
configuration in order to be served directly.
pkg(1) will never hit this code, as those requests don't get
handled by this webapp.
"""
self.setup(cherrypy.request)
rel_uri = cherrypy.request.path_info
# we need to recover the escaped portions of the URI
redir = rel_uri.lstrip("/").split("/")
pub_mf = "/".join(redir[0:4])
pkg_name = "/".join(redir[4:])
# encode the URI so our RewriteRules can process them
pkg_name = quote(pkg_name)
pkg_name = pkg_name.replace("/", "%2F")
pkg_name = pkg_name.replace("%40", "@", 1)
# build a URI that we can redirect to
redir = "{0}/{1}".format(pub_mf, pkg_name)
redir = "/{0}".format(redir.lstrip("/"))
raise cherrypy.HTTPRedirect(redir)
def get_project(self, repo):
return self.request('GET', '/projects/{}'.format(quote(repo, safe='')))
def get_issue(self, repo, issue_id):
try:
return self.request(
'GET',
'/projects/{}/issues'.format(
quote(repo, safe=''),
),
params={
# XXX(dcramer): this is an undocumented API
'iid': issue_id,
}
)[0]
except IndexError:
raise ApiError('Issue not found with ID', 404)
def create_issue(self, repo, data):
return self.request(
'POST',
'/projects/{}/issues'.format(quote(repo, safe='')),
data=data,
)
def create_note(self, repo, global_issue_id, data):
return self.request(
'POST',
'/projects/{}/issues/{}/notes'.format(
quote(repo, safe=''),
global_issue_id,
),
data=data,
)