def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
python类SECRET_KEY的实例源码
def process_request(self, request):
url = match(r'^/django_dev_protector/$', request.path)
if url and request.method == 'POST':
import json
data = json.loads(request.body.decode('utf-8'))
if data['key'] == settings.SECRET_KEY:
from .setup import save_status
environ[PROTECT_STATUS_VARIABLE] = str(data['status'])
save_status(data['status'])
return redirect('/')
if environ.get(PROTECT_STATUS_VARIABLE) == 'True':
from django.shortcuts import render
return render(request, TEMPLATE_NAME, {
'redirect_url': REDIRECT_URL
})
def get_user(request):
expiration = settings.COOKIE_EXPIRES
token = request.GET.get('token')
sso_dict = {}
if token:
token_confirm = Token(settings.SECRET_KEY)
try:
username = token_confirm.confirm_validate_token(token, expiration=expiration)
ret = User.objects.filter(username=username)
if ret:
sso_dict['username'] = ret[0].username
sso_dict['email'] = ret[0].email
sso_dict['cn'] = ret[0].last_name
except Exception as e:
sso_dict['error'] = 'token error'
else:
sso_dict['error'] = 'args error'
return HttpResponse(json.dumps(sso_dict))
def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
def post(self, request, pk):
user = User.objects.get(id=pk)
sign = hashlib.md5(user.email + settings.SECRET_KEY).hexdigest()
url = urlparse.ParseResult(
scheme=request.scheme,
netloc=urlparse.urlparse(request.get_raw_uri()).netloc,
path=reverse(('core:SetPassword')),
params='',
query = urllib.urlencode({'email': user.email, 'sign': sign}),
fragment='',
).geturl()
msg = EmailMultiAlternatives(
subject='??????',
body=get_template('users/user_email_activate.html').render({'url': url}),
from_email=settings.EMAIL_HOST_USER,
to=[user.email,],
)
msg.content_subtype = 'html'
status = msg.send(fail_silently=True)
response = '??????' if status else '??????, ???'
return HttpResponse(response)
def get_url_protection_options(user=None):
defaults = {
'TOKEN_LENGTH': 20,
'SIGNING_KEY': settings.SECRET_KEY,
'SIGNING_SALT': 'qr_code_url_protection_salt',
'ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER': False,
'ALLOWS_EXTERNAL_REQUESTS': False
}
options = defaults
if hasattr(settings, 'QR_CODE_URL_PROTECTION') and isinstance(settings.QR_CODE_URL_PROTECTION, dict):
options.update(settings.QR_CODE_URL_PROTECTION)
# Evaluate the callable if required.
if callable(options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER']):
options['ALLOWS_EXTERNAL_REQUESTS'] = user and options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER'](user)
elif options['ALLOWS_EXTERNAL_REQUESTS_FOR_REGISTERED_USER'] and user:
if callable(user.is_authenticated):
# Django version < 1.10
options['ALLOWS_EXTERNAL_REQUESTS'] = user.is_authenticated()
else:
# Django version >= 1.10
options['ALLOWS_EXTERNAL_REQUESTS'] = user.is_authenticated
else:
options['ALLOWS_EXTERNAL_REQUESTS'] = False
return options
def post(self, request):
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
employee = Employee.objects.get(emp_id=request.data.get('emp_id'))
encode = jwt.encode({'emp_id': employee.emp_id,
'auth': employee.auth,
'part_id': employee.part_id,
'create_time': time(),
'ip_addr': request.META.get('REMOTE_ADDR')
},
settings.SECRET_KEY, algorithm='HS256')
token = dict()
token['token'] = 'JWT ' + encode
return Response(token)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def handle(self, **kwargs):
self.write('Bootstrapping Promgen')
if not os.path.exists(settings.CONFIG_DIR):
self.write('Creating config directory {} ', settings.CONFIG_DIR)
os.makedirs(settings.CONFIG_DIR)
if not os.path.exists(settings.PROMGEN_CONFIG):
path = os.path.join(settings.BASE_DIR, 'promgen', 'tests', 'examples', 'promgen.yml')
self.write('Creating promgen config {} from {}', settings.PROMGEN_CONFIG, path)
shutil.copy(path, settings.PROMGEN_CONFIG)
self.write_setting('SECRET_KEY', default=settings.SECRET_KEY)
self.write_setting('DATABASE_URL', test=dj_database_url.parse)
# Schemes based on list of supported brokers
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
self.write_setting('CELERY_BROKER_URL', test=URLValidator(schemes=['redis', 'amqp', 'sqs']))
def get(self, request, *args, **kwargs):
if request.GET.get('key'):
serializer = URLSafeTimedSerializer(settings.SECRET_KEY)
try:
user_id = serializer.loads(
request.GET.get('key'),
max_age=60 * 2, # Signature expires after 2 minutes
)
user = get_object_or_404(User, id=user_id)
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(request, user)
return redirect('home')
except (BadSignature, BadTimeSignature):
return redirect('login')
return super().get(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
user = User.objects.get(username=email)
safe = URLSafeTimedSerializer(settings.SECRET_KEY)
url = '{site}{path}?key={key}'.format(
site=settings.SITE_URL,
path=reverse('login'),
key=safe.dumps(user.id),
)
send_mail(
_('Link to login into the Knowledge Base'),
url,
settings.DEFAULT_FROM_EMAIL,
[email],
fail_silently=False,
html_message=render_to_string(
'login_email.html', {'url': url}
),
)
return redirect('home')
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def _authenticate_credentials(self, request, token):
"""
Try to authenticate the given credentials. If authentication is
successful, return the user and token. If not, throw an error.
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY)
except:
msg = 'Invalid authentication. Could not decode token.'
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get(pk=payload['id'])
except User.DoesNotExist:
msg = 'No user matching this token was found.'
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = 'This user has been deactivated.'
raise exceptions.AuthenticationFailed(msg)
return (user, token)
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def save(self, *args, **kwargs):
"""save"""
try:
int(self.gender)
except ValueError:
self.gender = 0
super(Contact, self).save(*args, **kwargs)
if not self.uuid:
ascii_name = unicodedata.normalize('NFKD', unicode(self.fullname)).encode("ascii", 'ignore')
name = u'{0}-contact-{1}-{2}-{3}'.format(project_settings.SECRET_KEY, self.id, ascii_name, self.email)
name = unicodedata.normalize('NFKD', unicode(name)).encode("ascii", 'ignore')
self.uuid = unicode(uuid.uuid5(uuid.NAMESPACE_URL, name))
return super(Contact, self).save()
if self.entity.is_single_contact:
#force the entity name for ordering
self.entity.save()
def auth_return(request):
if not xsrfutil.validate_token(settings.SECRET_KEY, str(request.GET['state']),
request.user.username):
return HttpResponseBadRequest()
credential = FLOW.step2_exchange(request.GET)
http = httplib2.Http()
http = credential.authorize(http)
resp, data = http.request("https://beam.pro/api/v1/users/current")
data = json.loads(data)
print data
channelId = None
if 'channel' in data:
channelId = data['channel']['id']
name = data['channel'].get("name") or "(unnamed)"
internal_label = "%s-%s" % (channelId, name)
ac = BeamAppCreds(user=request.user, label=internal_label)
ac.save()
storage = Storage(BeamCredentialsModel, 'id', ac, 'credential')
storage.put(credential)
pu = BeamUpdate(credentials=ac, user=request.user, type="beam")
pu.save()
return HttpResponseRedirect("/beam/")
def auth_return(request):
if not xsrfutil.validate_token(settings.SECRET_KEY, str(request.GET['state']),
request.user.username):
return HttpResponseBadRequest()
credential = FLOW.step2_exchange(request.GET)
http = httplib2.Http()
http = credential.authorize(http)
resp, data = http.request("https://api.patreon.com/oauth2/api/current_user")
data = json.loads(data)
name = data['data']['attributes'].get("full_name") or "(unnamed)"
internal_label = "%s-%s" % (request.user.id, name)
ac = PatreonAppCreds(user=request.user, label=internal_label)
ac.save()
storage = Storage(PatreonCredentialsModel, 'id', ac, 'credential')
storage.put(credential)
pu = PatreonUpdate(credentials=ac, user=request.user, type="patreon")
pu.save()
return HttpResponseRedirect("/patreon/")
def salted_hmac(key_salt, value, secret=None):
"""
Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
secret (which defaults to settings.SECRET_KEY).
A different key_salt should be passed in for every application of HMAC.
"""
if secret is None:
secret = settings.SECRET_KEY
key_salt = force_bytes(key_salt)
secret = force_bytes(secret)
# We need to generate a derived key from our base key. We can do this by
# passing the key_salt and our base key through a pseudo-random function and
# SHA1 works nicely.
key = hashlib.sha1(key_salt + secret).digest()
# If len(key_salt + secret) > sha_constructor().block_size, the above
# line is redundant and could be replaced by key = key_salt + secret, since
# the hmac module does the same thing for keys longer than the block size.
# However, we need to ensure that we *always* do this.
return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def get_cookie_signer(salt='django.core.signing.get_cookie_signer'):
Signer = import_string(settings.SIGNING_BACKEND)
key = force_bytes(settings.SECRET_KEY)
return Signer(b'django.http.cookies' + key, salt=salt)
def dumps(obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False):
"""
Returns URL-safe, sha1 signed base64 compressed JSON string. If key is
None, settings.SECRET_KEY is used instead.
If compress is True (not the default) checks if compressing using zlib can
save some space. Prepends a '.' to signify compression. This is included
in the signature, to protect against zip bombs.
Salt can be used to namespace the hash, so that a signed string is
only valid for a given namespace. Leaving this at the default
value or re-using a salt value across different parts of your
application without good cause is a security risk.
The serializer is expected to return a bytestring.
"""
data = serializer().dumps(obj)
# Flag for if it's been compressed or not
is_compressed = False
if compress:
# Avoid zlib dependency unless compress is being used
compressed = zlib.compress(data)
if len(compressed) < (len(data) - 1):
data = compressed
is_compressed = True
base64d = b64_encode(data)
if is_compressed:
base64d = b'.' + base64d
return TimestampSigner(key, salt=salt).sign(base64d)
def __init__(self, key=None, sep=':', salt=None):
# Use of native strings in all versions of Python
self.key = key or settings.SECRET_KEY
self.sep = force_str(sep)
if _SEP_UNSAFE.match(self.sep):
warnings.warn('Unsafe Signer separator: %r (cannot be empty or consist of only A-z0-9-_=)' % sep,
RemovedInDjango110Warning)
self.salt = force_str(salt or
'%s.%s' % (self.__class__.__module__, self.__class__.__name__))
def check_secret_key(app_configs, **kwargs):
passed_check = (
getattr(settings, 'SECRET_KEY', None) and
len(set(settings.SECRET_KEY)) >= SECRET_KEY_MIN_UNIQUE_CHARACTERS and
len(settings.SECRET_KEY) >= SECRET_KEY_MIN_LENGTH
)
return [] if passed_check else [W009]
def get_cookie_signer(salt='django.core.signing.get_cookie_signer'):
Signer = import_string(settings.SIGNING_BACKEND)
key = force_bytes(settings.SECRET_KEY)
return Signer(b'django.http.cookies' + key, salt=salt)
def dumps(obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False):
"""
Returns URL-safe, sha1 signed base64 compressed JSON string. If key is
None, settings.SECRET_KEY is used instead.
If compress is True (not the default) checks if compressing using zlib can
save some space. Prepends a '.' to signify compression. This is included
in the signature, to protect against zip bombs.
Salt can be used to namespace the hash, so that a signed string is
only valid for a given namespace. Leaving this at the default
value or re-using a salt value across different parts of your
application without good cause is a security risk.
The serializer is expected to return a bytestring.
"""
data = serializer().dumps(obj)
# Flag for if it's been compressed or not
is_compressed = False
if compress:
# Avoid zlib dependency unless compress is being used
compressed = zlib.compress(data)
if len(compressed) < (len(data) - 1):
data = compressed
is_compressed = True
base64d = b64_encode(data)
if is_compressed:
base64d = b'.' + base64d
return TimestampSigner(key, salt=salt).sign(base64d)
def __init__(self, key=None, sep=':', salt=None):
# Use of native strings in all versions of Python
self.key = key or settings.SECRET_KEY
self.sep = force_str(sep)
if _SEP_UNSAFE.match(self.sep):
raise ValueError(
'Unsafe Signer separator: %r (cannot be empty or consist of '
'only A-z0-9-_=)' % sep,
)
self.salt = force_str(salt or '%s.%s' % (self.__class__.__module__, self.__class__.__name__))
def check_secret_key(app_configs, **kwargs):
passed_check = (
getattr(settings, 'SECRET_KEY', None) and
len(set(settings.SECRET_KEY)) >= SECRET_KEY_MIN_UNIQUE_CHARACTERS and
len(settings.SECRET_KEY) >= SECRET_KEY_MIN_LENGTH
)
return [] if passed_check else [W009]
def encode(the_id, sub_key):
assert 0 <= the_id < 2 ** 64
crc = binascii.crc32(bytes(the_id)) & 0xffffffff
message = struct.pack(b"<IQxxxx", crc, the_id)
assert len(message) == 16
key = settings.SECRET_KEY
iv = hashlib.sha256((key + sub_key).encode('ascii')).digest()[:16]
cypher = AES.new(key[:32], AES.MODE_CBC, iv)
eid = base64.urlsafe_b64encode(cypher.encrypt(message)).replace(b"=", b"")
return eid.decode('utf-8')
def decode(e, sub_key):
if isinstance(e, basestring):
e = bytes(e.encode("ascii"))
try:
padding = (3 - len(e) % 3) * b"="
e = base64.urlsafe_b64decode(e + padding)
except (TypeError, AttributeError, binascii.Error):
raise EncryptedIDDecodeError()
for key in getattr(settings, "SECRET_KEYS", [settings.SECRET_KEY]):
iv = hashlib.sha256((key + sub_key).encode('ascii')).digest()[:16]
cypher = AES.new(key[:32], AES.MODE_CBC, iv)
try:
msg = cypher.decrypt(e)
except ValueError:
raise EncryptedIDDecodeError()
try:
crc, the_id = struct.unpack(b"<IQxxxx", msg)
except struct.error:
raise EncryptedIDDecodeError()
try:
if crc != binascii.crc32(bytes(the_id)) & 0xffffffff:
continue
except (MemoryError, OverflowError):
raise EncryptedIDDecodeError()
return the_id
raise EncryptedIDDecodeError("Failed to decrypt, CRC never matched.")
def _sign(self, q, a, expires):
plain = [getattr(settings, 'SITE_URL', ''), settings.SECRET_KEY,
q, a, expires]
plain = "".join([str(p) for p in plain])
return sha1(plain).hexdigest()