def test_send_set_password_email(staff_user):
site = Site.objects.get_current()
ctx = {'protocol': 'http',
'domain': site.domain,
'site_name': site.name,
'uid': urlsafe_base64_encode(force_bytes(staff_user.pk)),
'token': default_token_generator.make_token(staff_user)}
send_templated_mail(template_name='dashboard/staff/set_password',
from_email=DEFAULT_FROM_EMAIL,
recipient_list=[staff_user.email],
context=ctx)
assert len(mail.outbox) == 1
generated_link = ('http://%s/account/password/reset/%s/%s/' %
(ctx['domain'], ctx['uid'].decode('utf-8'), ctx['token']))
sended_message = mail.outbox[0].body
assert generated_link in sended_message
python类force_bytes()的实例源码
def post_stats(request, response, data):
es_url_template = getattr(settings, 'CAVALRY_ELASTICSEARCH_URL_TEMPLATE', None)
if not es_url_template:
return
payload = build_payload(data, request, response)
es_url = es_url_template.format_map(
dict(
payload,
ymd=datetime.utcnow().strftime('%Y-%m-%d'),
),
)
body = force_bytes(json.dumps(payload, cls=PayloadJSONEncoder))
try:
resp = sess.post(es_url, data=body, headers={'Content-Type': 'application/json'}, timeout=0.5)
if resp.status_code != 201:
log.warning('Unable to post data to %s (error %s): %s', es_url, resp.status_code, resp.text)
except Exception as e:
log.warning('Unable to post data to %s: %s', es_url, e)
def cast_primitive_value(spec, value):
format = spec.get('format')
type = spec.get('type')
if type == 'boolean':
return (force_text(value).lower() in ('1', 'yes', 'true'))
if type == 'integer' or format in ('integer', 'long'):
return int(value)
if type == 'number' or format in ('float', 'double'):
return float(value)
if format == 'byte': # base64 encoded characters
return base64.b64decode(value)
if format == 'binary': # any sequence of octets
return force_bytes(value)
if format == 'date': # ISO8601 date
return iso8601.parse_date(value).date()
if format == 'dateTime': # ISO8601 datetime
return iso8601.parse_date(value)
if type == 'string':
return force_text(value)
return value
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def encode(self, password, salt):
bcrypt = self._load_library()
# Need to reevaluate the force_bytes call once bcrypt is supported on
# Python 3
# Hash the password prior to using bcrypt to prevent password truncation
# See: https://code.djangoproject.com/ticket/20138
if self.digest is not None:
# We use binascii.hexlify here because Python3 decided that a hex encoded
# bytestring is somehow a unicode.
password = binascii.hexlify(self.digest(force_bytes(password)).digest())
else:
password = force_bytes(password)
data = bcrypt.hashpw(password, salt)
return "%s$%s" % (self.algorithm, force_text(data))
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
UserModel = auth.get_user_model()
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def encode_file(boundary, key, file):
to_bytes = lambda s: force_bytes(s, settings.DEFAULT_CHARSET)
filename = os.path.basename(file.name) if hasattr(file, 'name') else ''
if hasattr(file, 'content_type'):
content_type = file.content_type
elif filename:
content_type = mimetypes.guess_type(filename)[0]
else:
content_type = None
if content_type is None:
content_type = 'application/octet-stream'
if not filename:
filename = key
return [
to_bytes('--%s' % boundary),
to_bytes('Content-Disposition: form-data; name="%s"; filename="%s"'
% (key, filename)),
to_bytes('Content-Type: %s' % content_type),
b'',
to_bytes(file.read())
]
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
UserModel = auth.get_user_model()
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def _check_query(self, query, country=False, city=False, city_or_country=False):
"Helper routine for checking the query and database availability."
# Making sure a string was passed in for the query.
if not isinstance(query, six.string_types):
raise TypeError('GeoIP query must be a string, not type %s' % type(query).__name__)
# Extra checks for the existence of country and city databases.
if city_or_country and not (self._country or self._city):
raise GeoIPException('Invalid GeoIP country and city data files.')
elif country and not self._country:
raise GeoIPException('Invalid GeoIP country data file: %s' % self._country_file)
elif city and not self._city:
raise GeoIPException('Invalid GeoIP city data file: %s' % self._city_file)
# Return the query string back to the caller. GeoIP only takes bytestrings.
return force_bytes(query)
def cast_primitive_value(spec, value):
format = spec.get('format')
type = spec.get('type')
if type == 'boolean':
return (force_text(value).lower() in ('1', 'yes', 'true'))
if type == 'integer' or format in ('integer', 'long'):
return int(value)
if type == 'number' or format in ('float', 'double'):
return float(value)
if format == 'byte': # base64 encoded characters
return base64.b64decode(value)
if format == 'binary': # any sequence of octets
return force_bytes(value)
if format == 'date': # ISO8601 date
return iso8601.parse_date(value).date()
if format == 'dateTime': # ISO8601 datetime
return iso8601.parse_date(value)
if type == 'string':
return force_text(value)
return value
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def _check_query(self, query, country=False, city=False, city_or_country=False):
"Helper routine for checking the query and database availability."
# Making sure a string was passed in for the query.
if not isinstance(query, six.string_types):
raise TypeError('GeoIP query must be a string, not type %s' % type(query).__name__)
# Extra checks for the existence of country and city databases.
if city_or_country and not (self._country or self._city):
raise GeoIPException('Invalid GeoIP country and city data files.')
elif country and not self._country:
raise GeoIPException('Invalid GeoIP country data file: %s' % self._country_file)
elif city and not self._city:
raise GeoIPException('Invalid GeoIP city data file: %s' % self._city_file)
# Return the query string back to the caller. GeoIP only takes bytestrings.
return force_bytes(query)
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def _check_query(self, query, country=False, city=False, city_or_country=False):
"Helper routine for checking the query and database availability."
# Making sure a string was passed in for the query.
if not isinstance(query, six.string_types):
raise TypeError('GeoIP query must be a string, not type %s' % type(query).__name__)
# Extra checks for the existence of country and city databases.
if city_or_country and not (self._country or self._city):
raise GeoIPException('Invalid GeoIP country and city data files.')
elif country and not self._country:
raise GeoIPException('Invalid GeoIP country data file: %s' % self._country_file)
elif city and not self._city:
raise GeoIPException('Invalid GeoIP city data file: %s' % self._city_file)
# Return the query string back to the caller. GeoIP only takes bytestrings.
return force_bytes(query)
def loads(s,
key=None,
salt='django.core.signing',
serializer=JSONSerializer,
max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(
TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def pbkdf2(password, salt, iterations, dklen=0, digest=None):
"""
Implements PBKDF2 with the same API as Django's existing
implementation, using cryptography.
:type password: any
:type salt: any
:type iterations: int
:type dklen: int
:type digest: cryptography.hazmat.primitives.hashes.HashAlgorithm
"""
if digest is None:
digest = settings.CRYPTOGRAPHY_DIGEST
if not dklen:
dklen = digest.digest_size
password = force_bytes(password)
salt = force_bytes(salt)
kdf = PBKDF2HMAC(
algorithm=digest,
length=dklen,
salt=salt,
iterations=iterations,
backend=settings.CRYPTOGRAPHY_BACKEND)
return kdf.derive(password)
def send_reset_password_email(request, user):
from_email = settings.DEFAULT_FROM_EMAIL
current_site = get_current_site(request)
site_name = current_site.name
domain = current_site.domain
token_generator = default_token_generator
use_https = request.is_secure()
context = {
'email': user.email,
'domain': domain,
'site_name': site_name,
'uid': urlsafe_base64_encode(force_bytes(user.pk)).decode(),
'user': user,
'token': token_generator.make_token(user),
'protocol': 'https' if use_https else 'http',
}
subject = loader.render_to_string('registration/password_reset_subject.txt', context)
subject = ''.join(subject.splitlines())
body = loader.render_to_string('registration/password_reset_email.html', context)
email_message = EmailMultiAlternatives(subject, body, from_email, [user.email])
email_message.send()
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
"""
Reverse of dumps(), raises BadSignature if signature fails.
The serializer is expected to accept a bytestring.
"""
# TimestampSigner.unsign always returns unicode but base64 and zlib
# compression operate on bytes.
base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age))
decompress = False
if base64d[:1] == b'.':
# It's compressed; uncompress it first
base64d = base64d[1:]
decompress = True
data = b64_decode(base64d)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
UserModel = auth.get_user_model()
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def _check_query(self, query, country=False, city=False, city_or_country=False):
"Helper routine for checking the query and database availability."
# Making sure a string was passed in for the query.
if not isinstance(query, six.string_types):
raise TypeError('GeoIP query must be a string, not type %s' % type(query).__name__)
# Extra checks for the existence of country and city databases.
if city_or_country and not (self._country or self._city):
raise GeoIPException('Invalid GeoIP country and city data files.')
elif country and not self._country:
raise GeoIPException('Invalid GeoIP country data file: %s' % self._country_file)
elif city and not self._city:
raise GeoIPException('Invalid GeoIP city data file: %s' % self._city_file)
# Return the query string back to the caller. GeoIP only takes bytestrings.
return force_bytes(query)
def _extarct_only_robot(xmlfile):
"""remove from file not robot's elements
robot elements are: 'suite' 'statistics' 'errors'
"""
original_doc = ET.parse(xmlfile)
root = original_doc.getroot()
devices = root.find("devices")
if devices is not None:
root.remove(devices)
source = StringIO(ET.tostring(root))
ets = ETSource(source)
execution_result = ExecutionResultBuilder(ets).build(Result())
patched_file = File(BytesIO(force_bytes(source.getvalue())), name=xmlfile.name)
return (execution_result, patched_file)
def md5(*bits):
return _md5(':'.join((force_bytes(bit, errors='replace') for bit in bits)))
def form_valid(self, form):
"""??????????????."""
user = form.save()
current_site = get_current_site(self.request)
domain = current_site.domain
subject_template = get_template(
'easy_regist/mailtemplate/new/subject.txt')
message_template = get_template(
'easy_regist/mailtemplate/new/message.txt')
context = {
'protocol': 'https' if self.request.is_secure() else 'http',
'domain': domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': default_token_generator.make_token(user),
'user': user,
}
subject = subject_template.render(context)
message = message_template.render(context)
from_email = settings.EMAIL_HOST_USER
to = [user.email]
send_mail(subject, message, from_email, to)
return super().form_valid(form)
def _hexdigest(cls, *args):
h = hashlib.md5()
if args:
for a in args:
h.update(force_bytes(a))
hexdigest = h.hexdigest()[:8]
return hexdigest
def active_email(request, user):
message = render_to_string('email/active_email.txt', {
'user': user,
'domain': request.get_host(),
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': account_activation_token.make_token(user),
})
subject = 'Activate your blog account.'
to_email = user.email
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list=[to_email])