def url_replace_param(url, name, value):
"""
Replace a GET parameter in an URL
"""
url_components = urlparse(force_str(url))
params = parse_qs(url_components.query)
if value is None:
del params[name]
else:
params[name] = value
return mark_safe(urlunparse([
url_components.scheme,
url_components.netloc,
url_components.path,
url_components.params,
urlencode(params, doseq=True),
url_components.fragment,
]))
python类force_str()的实例源码
def get_connection_params(self):
kwargs = {
'conv': django_conversions,
'charset': 'utf8',
}
if six.PY2:
kwargs['use_unicode'] = True
settings_dict = self.settings_dict
if settings_dict['USER']:
kwargs['user'] = settings_dict['USER']
if settings_dict['NAME']:
kwargs['db'] = settings_dict['NAME']
if settings_dict['PASSWORD']:
kwargs['passwd'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST'].startswith('/'):
kwargs['unix_socket'] = settings_dict['HOST']
elif settings_dict['HOST']:
kwargs['host'] = settings_dict['HOST']
if settings_dict['PORT']:
kwargs['port'] = int(settings_dict['PORT'])
# We need the number of potentially affected rows after an
# "UPDATE", not the number of changed rows.
kwargs['client_flag'] = CLIENT.FOUND_ROWS
kwargs.update(settings_dict['OPTIONS'])
return kwargs
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def post_raw_data(self, path, post_data):
"""
The built-in test client's post() method assumes the data you pass
is a dictionary and encodes it. If we just want to pass the data
unmodified, we need our own version of post().
"""
parsed = urlparse(path)
r = {
'CONTENT_LENGTH': len(post_data),
'CONTENT_TYPE': "text/plain",
'PATH_INFO': self.client._get_path(parsed),
'QUERY_STRING': force_str(parsed[4]),
'REQUEST_METHOD': str('POST'),
'wsgi.input': FakePayload(post_data),
}
# Add the request signature to the headers being sent.
r.update(self.get_signature(path, method='POST', body=post_data))
# Make the actual request.
return self.client.request(**r)
def render(self, name, value, attrs=None):
if value is None:
return format_html(
'<p class="text-error">{text}</p>',
text=_('Populate form fields above'),
)
final_attrs = self.build_attrs(attrs)
href = smart_urlquote(value)
text = self.text or href
return format_html(
'<p><a href="{href}" {attrs}>{text}</a></p>',
href=href,
attrs=flatatt(final_attrs),
text=force_str(text),
)
def test_sign_unsign(self):
"""sign/unsign should be reversible"""
signer = signing.Signer('predictable-secret')
examples = [
'q;wjmbk;wkmb',
'3098247529087',
'3098247:529:087:',
'jkw osanteuh ,rcuh nthu aou oauh ,ud du',
'\u2019',
]
if six.PY2:
examples.append(b'a byte string')
for example in examples:
signed = signer.sign(example)
self.assertIsInstance(signed, str)
self.assertNotEqual(force_str(example), signed)
self.assertEqual(example, signer.unsign(signed))
def test_sign_unsign(self):
"""sign/unsign should be reversible"""
signer = signing.BytesSigner('predictable-secret')
examples = [
b'q;wjmbk;wkmb',
b'3098247529087',
b'3098247:529:087:',
b'jkw osanteuh ,rcuh nthu aou oauh ,ud du',
b'\u2019',
]
if six.PY2:
examples.append(b'a byte string')
for example in examples:
signed = signer.sign(example)
self.assertIsInstance(signed, six.binary_type)
self.assertNotEqual(force_str(example), signed)
self.assertEqual(example, signer.unsign(signed))
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def modify_query(context, *params_to_remove, **params_to_change):
""" Renders a link with modified current query parameters """
query_params = []
for key, value_list in context["request"].GET._iterlists():
if not key in params_to_remove:
# don"t add key-value pairs for params to change
if key in params_to_change:
query_params.append((key, params_to_change[key]))
params_to_change.pop(key)
else:
# leave existing parameters as they were if not mentioned in the params_to_change
for value in value_list:
query_params.append((key, value))
# attach new params
for key, value in params_to_change.items():
query_params.append((key, value))
query_string = context["request"].path
if len(query_params):
query_string += "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def add_to_query(context, *params_to_remove, **params_to_add):
""" Renders a link with modified current query parameters """
query_params = []
# go through current query params..
for key, value_list in context["request"].GET._iterlists():
if not key in params_to_remove:
# don"t add key-value pairs which already exist in the query
if key in params_to_add and unicode(params_to_add[key]) in value_list:
params_to_add.pop(key)
for value in value_list:
query_params.append((key, value))
# add the rest key-value pairs
for key, value in params_to_add.items():
query_params.append((key, value))
# empty values will be removed
query_string = context["request"].path
if len(query_params):
query_string += "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def remove_from_query(context, *args, **kwargs):
""" Renders a link with modified current query parameters """
query_params = []
# go through current query params..
for key, value_list in context["request"].GET._iterlists():
# skip keys mentioned in the args
if not key in args:
for value in value_list:
# skip key-value pairs mentioned in kwargs
if not (key in kwargs and unicode(value) == unicode(kwargs[key])):
query_params.append((key, value))
# empty values will be removed
query_string = context["request"].path
if len(query_params):
query_string = "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def modify_query(context, *params_to_remove, **params_to_change):
""" Renders a link with modified current query parameters """
query_params = []
for key, value_list in context["request"].GET._iterlists():
if not key in params_to_remove:
# don't add key-value pairs for params to change
if key in params_to_change:
query_params.append((key, params_to_change[key]))
params_to_change.pop(key)
else:
# leave existing parameters as they were if not mentioned in the params_to_change
for value in value_list:
query_params.append((key, value))
# attach new params
for key, value in params_to_change.items():
query_params.append((key, value))
query_string = context["request"].path
if len(query_params):
query_string += "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def remove_from_query(context, *args, **kwargs):
""" Renders a link with modified current query parameters """
query_params = []
# go through current query params..
for key, value_list in context["request"].GET._iterlists():
# skip keys mentioned in the args
if not key in args:
for value in value_list:
# skip key-value pairs mentioned in kwargs
if not (key in kwargs and unicode(value) == unicode(kwargs[key])):
query_params.append((key, value))
# empty values will be removed
query_string = context["request"].path
if len(query_params):
query_string = "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def modify_query(context, *params_to_remove, **params_to_change):
""" Renders a link with modified current query parameters """
query_params = []
for key, value_list in context['request'].GET._iterlists():
if not key in params_to_remove:
# don't add key-value pairs for params to change
if key in params_to_change:
query_params.append((key, params_to_change[key]))
params_to_change.pop(key)
else:
# leave existing parameters as they were if not mentioned in the params_to_change
for value in value_list:
query_params.append((key, value))
# attach new params
for key, value in params_to_change.items():
query_params.append((key, value))
query_string = context['request'].path
if len(query_params):
query_string += "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def add_to_query(context, *params_to_remove, **params_to_add):
""" Renders a link with modified current query parameters """
query_params = []
# go through current query params..
for key, value_list in context['request'].GET._iterlists():
if not key in params_to_remove:
# don't add key-value pairs which already exist in the query
if key in params_to_add and unicode(params_to_add[key]) in value_list:
params_to_add.pop(key)
for value in value_list:
query_params.append((key, value))
# add the rest key-value pairs
for key, value in params_to_add.items():
query_params.append((key, value))
# empty values will be removed
query_string = context['request'].path
if len(query_params):
query_string += "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
utility_tags.py 文件源码
项目:Django-Web-Development-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def remove_from_query(context, *args, **kwargs):
""" Renders a link with modified current query parameters """
query_params = []
# go through current query params..
for key, value_list in context['request'].GET._iterlists():
# skip keys mentioned in the args
if not key in args:
for value in value_list:
# skip key-value pairs mentioned in kwargs
if not (key in kwargs and unicode(value) == unicode(kwargs[key])):
query_params.append((key, value))
# empty values will be removed
query_string = context['request'].path
if len(query_params):
query_string = "?%s" % urllib.urlencode([
(key, force_str(value)) for (key, value) in query_params if value
]).replace("&", "&")
return query_string
def __str__(self):
from django.db import models
if self.obj is None:
obj = "?"
elif isinstance(self.obj, models.base.ModelBase):
# We need to hardcode ModelBase and Field cases because its __str__
# method doesn't return "applabel.modellabel" and cannot be changed.
model = self.obj
app = model._meta.app_label
obj = '%s.%s' % (app, model._meta.object_name)
else:
obj = force_str(self.obj)
id = "(%s) " % self.id if self.id else ""
hint = "\n\tHINT: %s" % self.hint if self.hint else ''
return "%s: %s%s%s" % (obj, id, self.msg, hint)
def get_connection_params(self):
kwargs = {
'conv': django_conversions,
'charset': 'utf8',
}
if six.PY2:
kwargs['use_unicode'] = True
settings_dict = self.settings_dict
if settings_dict['USER']:
kwargs['user'] = settings_dict['USER']
if settings_dict['NAME']:
kwargs['db'] = settings_dict['NAME']
if settings_dict['PASSWORD']:
kwargs['passwd'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST'].startswith('/'):
kwargs['unix_socket'] = settings_dict['HOST']
elif settings_dict['HOST']:
kwargs['host'] = settings_dict['HOST']
if settings_dict['PORT']:
kwargs['port'] = int(settings_dict['PORT'])
# We need the number of potentially affected rows after an
# "UPDATE", not the number of changed rows.
kwargs['client_flag'] = CLIENT.FOUND_ROWS
kwargs.update(settings_dict['OPTIONS'])
return kwargs
def __str__(self):
from django.db import models
if self.obj is None:
obj = "?"
elif isinstance(self.obj, models.base.ModelBase):
# We need to hardcode ModelBase and Field cases because its __str__
# method doesn't return "applabel.modellabel" and cannot be changed.
model = self.obj
app = model._meta.app_label
obj = '%s.%s' % (app, model._meta.object_name)
else:
obj = force_str(self.obj)
id = "(%s) " % self.id if self.id else ""
hint = "\n\tHINT: %s" % self.hint if self.hint else ''
return "%s: %s%s%s" % (obj, id, self.msg, hint)
def get_connection_params(self):
kwargs = {
'conv': django_conversions,
'charset': 'utf8',
}
if six.PY2:
kwargs['use_unicode'] = True
settings_dict = self.settings_dict
if settings_dict['USER']:
kwargs['user'] = settings_dict['USER']
if settings_dict['NAME']:
kwargs['db'] = settings_dict['NAME']
if settings_dict['PASSWORD']:
kwargs['passwd'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST'].startswith('/'):
kwargs['unix_socket'] = settings_dict['HOST']
elif settings_dict['HOST']:
kwargs['host'] = settings_dict['HOST']
if settings_dict['PORT']:
kwargs['port'] = int(settings_dict['PORT'])
# We need the number of potentially affected rows after an
# "UPDATE", not the number of changed rows.
kwargs['client_flag'] = CLIENT.FOUND_ROWS
kwargs.update(settings_dict['OPTIONS'])
return kwargs
def __str__(self):
from django.db import models
if self.obj is None:
obj = "?"
elif isinstance(self.obj, models.base.ModelBase):
# We need to hardcode ModelBase and Field cases because its __str__
# method doesn't return "applabel.modellabel" and cannot be changed.
model = self.obj
app = model._meta.app_label
obj = '%s.%s' % (app, model._meta.object_name)
else:
obj = force_str(self.obj)
id = "(%s) " % self.id if self.id else ""
hint = "\n\tHINT: %s" % self.hint if self.hint else ''
return "%s: %s%s%s" % (obj, id, self.msg, hint)
def path_and_rename(instance, filename):
ext = filename.split('.')[-1]
if instance.pk:
filename = '{}.{}'.format(instance.pk, ext)
else:
filename = '{}.{}'.format(uuid4().hex, ext)
dirname = force_text(datetime.datetime.now().strftime(force_str('uploads/%Y/%m/%d/')))
return os.path.join(dirname, filename)
def _mjml_render_by_tcpserver(mjml_code):
if len(mjml_settings.MJML_TCPSERVERS) > 1:
servers = list(mjml_settings.MJML_TCPSERVERS)[:]
random.shuffle(servers)
else:
servers = mjml_settings.MJML_TCPSERVERS
mjml_code = mjml_code.encode('utf8') or ' '
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for host, port in servers:
try:
s.connect((host, port))
except socket.error:
continue
try:
s.send(mjml_code)
ok = force_str(s.recv(1)) == '0'
result_len = int(force_str(s.recv(9)))
result = force_str(s.recv(result_len))
if ok:
return result
else:
raise RuntimeError('MJML compile error (via MJML TCP server): {}'.format(result))
finally:
s.close()
raise RuntimeError('MJML compile error (via MJML TCP server): no working server')
def strptime(self, value, format):
return datetime.datetime.strptime(force_str(value), format).date()
def strptime(self, value, format):
return datetime.datetime.strptime(force_str(value), format).time()
def strptime(self, value, format):
return datetime.datetime.strptime(force_str(value), format)
def __repr__(self):
return force_str('<%s %s %s>' % (self.__class__.__name__, self.name, self.regex.pattern))
def __repr__(self):
return force_str("<%s: %s (%s)>" % (
self.__class__.__name__, self.name, self.content_type))
def signature(self, value):
signature = base64_hmac(self.salt + 'signer', value, self.key)
# Convert the signature from bytes to str only on Python 3
return force_str(signature)
def sign(self, value):
value = force_str(value)
return str('%s%s%s') % (value, self.sep, self.signature(value))
def unsign(self, signed_value):
signed_value = force_str(signed_value)
if self.sep not in signed_value:
raise BadSignature('No "%s" found in value' % self.sep)
value, sig = signed_value.rsplit(self.sep, 1)
if constant_time_compare(sig, self.signature(value)):
return force_text(value)
raise BadSignature('Signature "%s" does not match' % sig)