def add_sharing_banner(page, response):
if not getattr(settings, 'WAGTAILSHARING_BANNER', True):
return
if hasattr(response, 'render') and callable(response.render):
response.render()
html = force_text(response.content)
body = re.search(r'(?i)<body.*?>', html)
if body:
endpos = body.end()
banner_template_name = 'wagtailsharing/banner.html'
banner_template = loader.get_template(banner_template_name)
banner_html = banner_template.render()
banner_html = force_text(banner_html)
content_with_banner = html[:endpos] + banner_html + html[endpos:]
response.content = content_with_banner
python类force_text()的实例源码
def message(self):
encoding = self.encoding or settings.DEFAULT_CHARSET
msg = SafeMIMEText(self.body, self.content_subtype, encoding)
msg = self._create_message(msg)
msg['Subject'] = self.subject
msg['From'] = self.extra_headers.get('From', self.from_email)
msg['To'] = self.extra_headers.get('To', ', '.join(map(force_text, self.to)))
if self.cc:
msg['Cc'] = ', '.join(map(force_text, self.cc))
if self.reply_to:
msg['Reply-To'] = self.extra_headers.get('Reply-To', ', '.join(map(force_text, self.reply_to)))
# Email header names are case-insensitive (RFC 2045), so we have to
# accommodate that when doing comparisons.
header_names = [key.lower() for key in self.extra_headers]
if 'date' not in header_names:
msg['Date'] = formatdate()
if 'message-id' not in header_names:
# Use cached DNS_NAME for performance
msg['Message-ID'] = make_msgid(domain=DNS_NAME)
for name, value in self.extra_headers.items():
if name.lower() in ('from', 'to'): # From and To are already handled
continue
msg[name] = value
return msg
def get(self, request, **kwargs):
"""uid?token?????????."""
token = kwargs.get('token')
uidb64 = kwargs.get('uidb64')
try:
uid = force_text(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
user = None
if user and user.is_active:
if default_token_generator.check_token(user, token):
user.is_active = True
user.save()
return super().get(request, **kwargs)
raise Http404
def get(self, request, *args, **kwargs):
try:
uid = force_text(urlsafe_base64_decode(kwargs['uid64']))
user = UserModel.objects.get(pk=uid)
except Exception as e:
logger.info(e)
user = None
if user is not None and account_activation_token.check_token(user, kwargs['token']):
email_address = user.emailaddress_set.first()
email_address.verified = True
email_address.save()
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(request, user)
# return redirect('home')
return HttpResponse('Thank you for your email confirmation. Now you can login your account.')
return HttpResponse('Activation link is invalid!')
def render_options(self, *args):
"""Render only selected options and set QuerySet from :class:`ModelChoiceIterator`."""
try:
selected_choices, = args
except ValueError:
choices, selected_choices = args
choices = chain(self.choices, choices)
else:
choices = self.choices
selected_choices = {force_text(v) for v in selected_choices}
output = ['<option></option>' if not self.is_required and not self.allow_multiple_selected else '']
if isinstance(self.choices, ModelChoiceIterator):
if self.queryset is None:
self.queryset = self.choices.queryset
selected_choices = {c for c in selected_choices
if c not in self.choices.field.empty_values}
choices = [(obj.pk, self.label_from_instance(obj))
for obj in self.choices.queryset.filter(pk__in=selected_choices)]
else:
choices = [(k, v) for k, v in choices if force_text(k) in selected_choices]
for option_value, option_label in choices:
output.append(self.render_option(selected_choices, option_value, option_label))
return '\n'.join(output)
def label_from_instance(self, obj):
"""
Return option label representation from instance.
Can be overridden to change the representation of each choice.
Example usage::
class MyWidget(ModelSelect2Widget):
def label_from_instance(obj):
return force_text(obj.title).upper()
Args:
obj (django.db.models.Model): Instance of Django Model.
Returns:
str: Option label.
"""
return force_text(obj)
def get_header_name(self, model, field_name):
""" Override if a custom value or behaviour is required for specific fields. """
if '__' not in field_name:
try:
field = model._meta.get_field(field_name)
except FieldDoesNotExist as e:
if not hasattr(model, field_name):
raise e
# field_name is a property.
return field_name.replace('_', ' ').title()
return force_text(field.verbose_name).title()
else:
related_field_names = field_name.split('__')
field = model._meta.get_field(related_field_names[0])
assert field.is_relation
return self.get_header_name(field.related_model, '__'.join(related_field_names[1:]))
def _get_volume_types(self):
try:
volume_types = cinder.volume_type_list(self.request)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve volume type list.'))
# check if we have default volume type so we can present the
# description of no volume type differently
no_type_description = None
if self.default_vol_type is None:
message = \
_("If \"No volume type\" is selected, the volume will be "
"created without a volume type.")
no_type_description = encoding.force_text(message)
type_descriptions = [{'name': 'no_type',
'description': no_type_description}] + \
[{'name': type.name,
'description': getattr(type, "description", "")}
for type in volume_types]
return json.dumps(type_descriptions)
def render_option(self, selected_choices, option_value, option_label):
option_value = force_text(option_value)
other_html = (u' selected="selected"'
if option_value in selected_choices else '')
if callable(self.transform_html_attrs):
html_attrs = self.transform_html_attrs(option_label)
other_html += flatatt(html_attrs)
if not isinstance(option_label, (six.string_types, Promise)):
for data_attr in self.data_attrs:
data_value = html.conditional_escape(
force_text(getattr(option_label,
data_attr, "")))
other_html += ' data-%s="%s"' % (data_attr, data_value)
if callable(self.transform):
option_label = self.transform(option_label)
return u'<option value="%s"%s>%s</option>' % (
html.escape(option_value), other_html,
html.conditional_escape(force_text(option_label)))
def test_handle_translated(self):
translated_unicode = u'\u30b3\u30f3\u30c6\u30ca\u30fc\u304c' \
u'\u7a7a\u3067\u306f\u306a\u3044\u305f' \
u'\u3081\u3001\u524a\u9664\u3067\u304d' \
u'\u307e\u305b\u3093\u3002'
# Japanese translation of:
# 'Because the container is not empty, it can not be deleted.'
expected = ['error', force_text(translated_unicode), '']
req = self.request
req.META['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest'
try:
raise exceptions.Conflict(translated_unicode)
except exceptions.Conflict:
exceptions.handle(req)
# The real test here is to make sure the handle method doesn't throw a
# UnicodeEncodeError, but making sure the message is correct could be
# useful as well.
self.assertItemsEqual(req.horizon['async_messages'], [expected])
def render(self, form, form_style, context, template_pack=TEMPLATE_PACK):
success_url = form.opts.get('success_url', '')
delete_url = form.opts.get('delete_url', '')
if delete_url:
delete_url += '&' if '?' in delete_url else '?'
delete_url += 'success_url=' + force_text(form.opts.get('delete_success_url', success_url))
template = get_template(form.opts.get('form_actions_template', 'ajaxviews/_form_controls.html'))
btn_group = template.render({
'delete_url': delete_url,
'success_url': force_text(success_url),
'modal_form': form.opts.get('modal_form', False),
'form_preview': form.opts.get('preview_stage', False),
'delete_confirmation': form.opts.get('delete_confirmation', False),
'form_cfg': json.dumps(form.form_cfg) if getattr(form, 'form_cfg', None) else None,
})
layout_object = FormActions(
Submit('save', form.opts.get('save_button_name', 'Save')),
HTML(btn_group),
style='margin-bottom: 0;'
)
return layout_object.render(form, form_style, context)
def process_response(self, request, response):
if not request.is_ajax() and not isinstance(response, HttpResponseRedirect) and hasattr(response, 'content'):
content = force_text(response.content, encoding=response.charset)
if '</body>' not in content:
return response
json_cfg = {}
if hasattr(response, 'context_data'):
json_cfg = response.context_data.get('json_cfg', {})
template = get_template('ajaxviews/_middleware.html')
html = template.render({
'json_cfg': json_cfg,
'main_name': settings.REQUIRE_MAIN_NAME,
})
l_content, r_content = content.rsplit('</body>', 1)
content = ''.join([l_content, html, '</body>', r_content])
response.content = response.make_bytes(content)
if response.get('Content-Length', None):
response['Content-Length'] = len(response.content)
return response
def get_insert_position(admin_menu, item_name):
"""
Ensures that there is a SHORTCUTS_BREAK and returns a position for an
alphabetical position against all items between SHORTCUTS_BREAK, and
the ADMINISTRATION_BREAK.
"""
start = admin_menu.find_first(Break, identifier=SHORTCUTS_BREAK)
if not start:
end = admin_menu.find_first(Break, identifier=ADMINISTRATION_BREAK)
admin_menu.add_break(SHORTCUTS_BREAK, position=end.index)
start = admin_menu.find_first(Break, identifier=SHORTCUTS_BREAK)
end = admin_menu.find_first(Break, identifier=ADMINISTRATION_BREAK)
items = admin_menu.get_items()[start.index + 1: end.index]
for idx, item in enumerate(items):
try:
if force_text(item_name.lower()) < force_text(item.name.lower()):
return idx + start.index + 1
except AttributeError:
# Some item types do not have a 'name' attribute.
pass
return end.index
def _format_callback(self, obj, user, admin_site, perms_needed):
has_admin = obj.__class__ in admin_site._registry
opts = obj._meta
if has_admin:
admin_url = reverse('%s:%s_%s_change'
% (admin_site.name,
opts.app_label,
opts.object_name.lower()),
None, (quote(obj._get_pk_val()),))
p = get_delete_permission(opts)
if not user.has_perm(p):
perms_needed.add(opts.verbose_name)
# Display a link to the admin page.
return mark_safe('%s: <a href="%s">%s</a>' %
(escape(capfirst(opts.verbose_name)),
admin_url,
escape(obj)))
else:
# Don't display link to edit, because it either has no
# admin or is edited inline.
return '%s: %s' % (capfirst(opts.verbose_name), force_text(obj))
def cast_primitive_value(spec, value):
format = spec.get('format')
type = spec.get('type')
if type == 'boolean':
return (force_text(value).lower() in ('1', 'yes', 'true'))
if type == 'integer' or format in ('integer', 'long'):
return int(value)
if type == 'number' or format in ('float', 'double'):
return float(value)
if format == 'byte': # base64 encoded characters
return base64.b64decode(value)
if format == 'binary': # any sequence of octets
return force_bytes(value)
if format == 'date': # ISO8601 date
return iso8601.parse_date(value).date()
if format == 'dateTime': # ISO8601 datetime
return iso8601.parse_date(value)
if type == 'string':
return force_text(value)
return value
def _normalize_name(self, name):
"""
Normalizes the name so that paths like /path/to/ignored/../foo.txt
work. We check to make sure that the path pointed to is not outside
the directory specified by the LOCATION setting.
"""
base_path = force_text(self.location)
base_path = base_path.rstrip('/')
final_path = urljoin(base_path.rstrip('/') + "/", name)
base_path_len = len(base_path)
if (not final_path.startswith(base_path) or
final_path[base_path_len:base_path_len + 1] not in ('', '/')):
raise SuspiciousOperation("Attempted access to '%s' denied." %
name)
return final_path.lstrip('/')
def regex(self):
"""
Returns a compiled regular expression, depending upon the activated
language-code.
"""
language_code = get_language()
if language_code not in self._regex_dict:
if isinstance(self._regex, six.string_types):
regex = self._regex
else:
regex = force_text(self._regex)
try:
compiled_regex = re.compile(regex, re.UNICODE)
except re.error as e:
raise ImproperlyConfigured(
'"%s" is not a valid regular expression: %s' %
(regex, six.text_type(e)))
self._regex_dict[language_code] = compiled_regex
return self._regex_dict[language_code]
def popen_wrapper(args, os_err_exc_type=CommandError, universal_newlines=True):
"""
Friendly wrapper around Popen.
Returns stdout output, stderr output and OS status code.
"""
try:
p = Popen(args, shell=False, stdout=PIPE, stderr=PIPE,
close_fds=os.name != 'nt', universal_newlines=universal_newlines)
except OSError as e:
strerror = force_text(e.strerror, DEFAULT_LOCALE_ENCODING,
strings_only=True)
six.reraise(os_err_exc_type, os_err_exc_type('Error executing %s: %s' %
(args[0], strerror)), sys.exc_info()[2])
output, errors = p.communicate()
return (
output,
force_text(errors, DEFAULT_LOCALE_ENCODING, strings_only=True),
p.returncode
)
def forbid_multi_line_headers(name, val, encoding):
"""Forbids multi-line headers, to prevent header injection."""
encoding = encoding or settings.DEFAULT_CHARSET
val = force_text(val)
if '\n' in val or '\r' in val:
raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
try:
val.encode('ascii')
except UnicodeEncodeError:
if name.lower() in ADDRESS_HEADERS:
val = ', '.join(sanitize_address(addr, encoding)
for addr in getaddresses((val,)))
else:
val = Header(val, encoding).encode()
else:
if name.lower() == 'subject':
val = Header(val).encode()
return str(name), val
def sanitize_address(addr, encoding):
if not isinstance(addr, tuple):
addr = parseaddr(force_text(addr))
nm, addr = addr
nm = Header(nm, encoding).encode()
try:
addr.encode('ascii')
except UnicodeEncodeError: # IDN
if '@' in addr:
localpart, domain = addr.split('@', 1)
localpart = str(Header(localpart, encoding))
domain = domain.encode('idna').decode('ascii')
addr = '@'.join([localpart, domain])
else:
addr = Header(addr, encoding).encode()
return formataddr((nm, addr))
def validate(self, password, user=None):
if not user:
return
for attribute_name in self.user_attributes:
value = getattr(user, attribute_name, None)
if not value or not isinstance(value, string_types):
continue
value_parts = re.split('\W+', value) + [value]
for value_part in value_parts:
if SequenceMatcher(a=password.lower(), b=value_part.lower()).quick_ratio() > self.max_similarity:
verbose_name = force_text(user._meta.get_field(attribute_name).verbose_name)
raise ValidationError(
_("The password is too similar to the %(verbose_name)s."),
code='password_too_similar',
params={'verbose_name': verbose_name},
)
def encode(self, password, salt):
bcrypt = self._load_library()
# Need to reevaluate the force_bytes call once bcrypt is supported on
# Python 3
# Hash the password prior to using bcrypt to prevent password truncation
# See: https://code.djangoproject.com/ticket/20138
if self.digest is not None:
# We use binascii.hexlify here because Python3 decided that a hex encoded
# bytestring is somehow a unicode.
password = binascii.hexlify(self.digest(force_bytes(password)).digest())
else:
password = force_bytes(password)
data = bcrypt.hashpw(password, salt)
return "%s$%s" % (self.algorithm, force_text(data))
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def units(self):
"""
Returns a 2-tuple of the units value and the units name,
and will automatically determines whether to return the linear
or angular units.
"""
units, name = None, None
if self.projected or self.local:
units, name = capi.linear_units(self.ptr, byref(c_char_p()))
elif self.geographic:
units, name = capi.angular_units(self.ptr, byref(c_char_p()))
if name is not None:
name = force_text(name)
return (units, name)
# #### Spheroid/Ellipsoid Properties ####
def model_format_dict(obj):
"""
Return a `dict` with keys 'verbose_name' and 'verbose_name_plural',
typically for use with string formatting.
`obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance.
"""
if isinstance(obj, (models.Model, models.base.ModelBase)):
opts = obj._meta
elif isinstance(obj, models.query.QuerySet):
opts = obj.model._meta
else:
opts = obj
return {
'verbose_name': force_text(opts.verbose_name),
'verbose_name_plural': force_text(opts.verbose_name_plural)
}
def process_response(self, request, response):
"""
Send broken link emails for relevant 404 NOT FOUND responses.
"""
if response.status_code == 404 and not settings.DEBUG:
domain = request.get_host()
path = request.get_full_path()
referer = force_text(request.META.get('HTTP_REFERER', ''), errors='replace')
if not self.is_ignorable_request(request, path, domain, referer):
ua = force_text(request.META.get('HTTP_USER_AGENT', '<none>'), errors='replace')
ip = request.META.get('REMOTE_ADDR', '<none>')
mail_managers(
"Broken %slink on %s" % (
('INTERNAL ' if self.is_internal_request(domain, referer) else ''),
domain
),
"Referrer: %s\nRequested URL: %s\nUser agent: %s\n"
"IP address: %s\n" % (referer, path, ua, ip),
fail_silently=True)
return response
def last_executed_query(self, cursor, sql, params):
"""
Returns a string of the query last executed by the given cursor, with
placeholders replaced with actual values.
`sql` is the raw query containing placeholders, and `params` is the
sequence of parameters. These are used by default, but this method
exists for database backends to provide a better implementation
according to their own quoting schemes.
"""
# Convert params to contain Unicode values.
to_unicode = lambda s: force_text(s, strings_only=True, errors='replace')
if isinstance(params, (list, tuple)):
u_params = tuple(to_unicode(val) for val in params)
elif params is None:
u_params = ()
else:
u_params = {to_unicode(k): to_unicode(v) for k, v in params.items()}
return six.text_type("QUERY = %r - PARAMS = %r") % (sql, u_params)
def contribute_to_class(self, cls, name, virtual_only=False):
super(RelatedField, self).contribute_to_class(cls, name, virtual_only=virtual_only)
self.opts = cls._meta
if not cls._meta.abstract:
if self.remote_field.related_name:
related_name = force_text(self.remote_field.related_name) % {
'class': cls.__name__.lower(),
'app_label': cls._meta.app_label.lower()
}
self.remote_field.related_name = related_name
def resolve_related_class(model, related, field):
field.remote_field.model = related
field.do_related_class(related, model)
lazy_related_operation(resolve_related_class, cls, self.remote_field.model, field=self)
def cache_key(self, template_name, template_dirs, skip=None):
"""
Generate a cache key for the template name, dirs, and skip.
If skip is provided, only origins that match template_name are included
in the cache key. This ensures each template is only parsed and cached
once if contained in different extend chains like:
x -> a -> a
y -> a -> a
z -> a -> a
"""
dirs_prefix = ''
skip_prefix = ''
if skip:
matching = [origin.name for origin in skip if origin.template_name == template_name]
if matching:
skip_prefix = self.generate_hash(matching)
if template_dirs:
dirs_prefix = self.generate_hash(template_dirs)
return '-'.join(filter(bool, [force_text(template_name), skip_prefix, dirs_prefix]))
def append(self, element):
if isinstance(element, six.string_types):
element = force_text(element)
element = normalize_whitespace(element)
if self.children:
if isinstance(self.children[-1], six.string_types):
self.children[-1] += element
self.children[-1] = normalize_whitespace(self.children[-1])
return
elif self.children:
# removing last children if it is only whitespace
# this can result in incorrect dom representations since
# whitespace between inline tags like <span> is significant
if isinstance(self.children[-1], six.string_types):
if self.children[-1].isspace():
self.children.pop()
if element:
self.children.append(element)