def deleteUser(userName, SN):
try:
response = client.deactivate_mfa_device(
UserName=userName,
SerialNumber=SN
)
print("MFA device deactivated, trying to delete device.")
except:
print("Unable to deactivate MFA token. Could be that it's not created.")
try:
response = client.delete_virtual_mfa_device(
SerialNumber=SN
)
print("MFA device deleted, trying to delete new user.")
except:
print("Unable to delete MFA token. Could be that it's not created.")
try:
response = client.delete_user(
UserName=userName
)
print("User deleted")
except:
print("Unable to delete user: " + userName)
return True
python类new()的实例源码
def generate_token(seed):
"""Summary
Args:
seed (TYPE): Description
Returns:
TYPE: Description
"""
seed = base64.b32decode(seed, True)
hmacHash = hmac.new(
seed, struct.pack(
">Q", int(
time.time() // 30)),
hashlib.sha1).digest()
hashOffset = ord(hmacHash[19]) & 0xf
token = (struct.unpack(
">I",
hmacHash[hashOffset:hashOffset + 4])[0] & 0x7fffffff) % 10 ** 6
return token
def build_hmac_sha1(pkt, pw = '\0' * 20, ip4l = [], ip6l = []):
if not pkt.haslayer(CARP):
return None
p = pkt[CARP]
h = hmac.new(pw, digestmod = hashlib.sha1)
# XXX: this is a dirty hack. it needs to pack version and type into a single 8bit field
h.update('\x21')
# XXX: mac addy if different from special link layer. comes before vhid
h.update(struct.pack('!B', p.vhid))
sl = []
for i in ip4l:
# sort ips from smallest to largest
sl.append(inet_aton(i))
sl.sort()
for i in sl:
h.update(i)
# XXX: do ip6l sorting
return h.digest()
def post(self):
"""
Creates new post and redirect to new post page.
"""
if not self.user:
return self.redirect('/blog')
subject = self.request.get('subject')
content = self.request.get('content')
if subject and content:
p = Post(parent=blog_key(), user_id=self.user.key().id(),
subject=subject, content=content)
p.put()
self.redirect('/blog/%s' % str(p.key().id()))
else:
error = "subject and content, please!"
self.render("newpost.html", subject=subject,
content=content, error=error)
def gen_user_breadcrumb(size):
"""
Used in comments posting.
:param size:
:return:
"""
key = 'iN4$aGr0m'
dt = int(time.time() * 1000)
# typing time elapsed
time_elapsed = randint(500, 1500) + size * randint(500, 1500)
text_change_event_count = max(1, size / randint(3, 5))
data = '{size!s} {elapsed!s} {count!s} {dt!s}'.format(**{
'size': size, 'elapsed': time_elapsed, 'count': text_change_event_count, 'dt': dt
})
return '{0!s}\n{1!s}\n'.format(
base64.b64encode(hmac.new(key.encode('ascii'), data.encode('ascii'), digestmod=hashlib.sha256).digest()),
base64.b64encode(data.encode('ascii')))
def kciCloudHelper(iCloudKey):
#this function is tailored to keychaindump. Takes an iCloud key, and returns tokens
msg = base64.b64decode(iCloudKey)
key = "t9s\"lx^awe.580Gj%'ld+0LG<#9xa?>vb)-fkwb92[}"
hashed = hmac.new(key, msg, digestmod=hashlib.md5).digest()
hexedKey = binascii.hexlify(hashed)
IV = 16 * '0'
mme_token_file = glob("/Users/%s/Library/Application Support/iCloud/Accounts/*" % get_bella_user()) #this doesnt need to be globber bc only current user's info can be decrypted
for x in mme_token_file:
try:
int(x.split("/")[-1])
mme_token_file = x
except ValueError:
continue
send_msg("\t%sDecrypting token plist\n\t [%s]\n" % (blue_star, mme_token_file), False)
decryptedBinary = subprocess.check_output("openssl enc -d -aes-128-cbc -iv '%s' -K %s < '%s'" % (IV, hexedKey, mme_token_file), shell=True)
from Foundation import NSData, NSPropertyListSerialization
binToPlist = NSData.dataWithBytes_length_(decryptedBinary, len(decryptedBinary))
token_plist = NSPropertyListSerialization.propertyListWithData_options_format_error_(binToPlist, 0, None, None)[0]
tokz = "[%s | %s]\n" % (token_plist["appleAccountInfo"]["primaryEmail"], token_plist["appleAccountInfo"]["fullName"])
tokz += "%s:%s\n" % (token_plist["appleAccountInfo"]["dsPrsID"], token_plist["tokens"]["mmeAuthToken"])
return tokz
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def auth_sub_string_from_body(http_body):
"""Extracts the AuthSub token from an HTTP body string.
Used to find the new session token after making a request to upgrade a
single use AuthSub token.
Args:
http_body: str The repsonse from the server which contains the AuthSub
key. For example, this function would find the new session token
from the server's response to an upgrade token request.
Returns:
The raw token value string to use in an AuthSubToken object.
"""
for response_line in http_body.splitlines():
if response_line.startswith('Token='):
# Strip off Token= and return the token value string.
return response_line[6:]
return None
def generate_request_for_access_token(
request_token, auth_server_url=ACCESS_TOKEN_URL):
"""Creates a request to ask the OAuth server for an access token.
Requires a request token which the user has authorized. See the
documentation on OAuth with Google Data for more details:
http://code.google.com/apis/accounts/docs/OAuth.html#AccessToken
Args:
request_token: An OAuthHmacToken or OAuthRsaToken which the user has
approved using their browser.
auth_server_url: (optional) The URL at which the OAuth access token is
requested. Defaults to
https://www.google.com/accounts/OAuthGetAccessToken
Returns:
A new HttpRequest object which can be sent to the OAuth server to
request an OAuth Access Token.
"""
http_request = atom.http_core.HttpRequest(auth_server_url, 'POST')
http_request.headers['Content-Length'] = '0'
return request_token.modify_request(http_request)
def upgrade_to_access_token(request_token, server_response_body):
"""Extracts access token information from response to an upgrade request.
Once the server has responded with the new token info for the OAuth
access token, this method modifies the request_token to set and unset
necessary fields to create valid OAuth authorization headers for requests.
Args:
request_token: An OAuth token which this function modifies to allow it
to be used as an access token.
server_response_body: str The server's response to an OAuthAuthorizeToken
request. This should contain the new token and token_secret which
are used to generate the signature and parameters of the Authorization
header in subsequent requests to Google Data APIs.
Returns:
The same token object which was passed in.
"""
token, token_secret = oauth_token_info_from_body(server_response_body)
request_token.token = token
request_token.token_secret = token_secret
request_token.auth_state = ACCESS_TOKEN
request_token.next = None
request_token.verifier = None
return request_token
def generate_token(key, user_id, action_id='', when=None):
"""Generates a URL-safe token for the given user, action, time tuple.
Args:
key: secret key to use.
user_id: the user ID of the authenticated user.
action_id: a string identifier of the action they requested
authorization for.
when: the time in seconds since the epoch at which the user was
authorized for this action. If not set the current time is used.
Returns:
A string XSRF protection token.
"""
digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8'))
digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8'))
digester.update(DELIMITER)
digester.update(_helpers._to_bytes(action_id, encoding='utf-8'))
digester.update(DELIMITER)
when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8')
digester.update(when)
digest = digester.digest()
token = base64.urlsafe_b64encode(digest + DELIMITER + when)
return token
def generate_csrf_token(self, csrf_context):
if self.SECRET_KEY is None:
raise Exception('must set SECRET_KEY in a subclass of this form for it to work')
if csrf_context is None:
raise TypeError('Must provide a session-like object as csrf context')
session = getattr(csrf_context, 'session', csrf_context)
if 'csrf' not in session:
session['csrf'] = sha1(os.urandom(64)).hexdigest()
self.csrf_token.csrf_key = session['csrf']
if self.TIME_LIMIT:
expires = (datetime.now() + self.TIME_LIMIT).strftime(self.TIME_FORMAT)
csrf_build = '%s%s' % (session['csrf'], expires)
else:
expires = ''
csrf_build = session['csrf']
hmac_csrf = hmac.new(self.SECRET_KEY, csrf_build.encode('utf8'), digestmod=sha1)
return '%s##%s' % (expires, hmac_csrf.hexdigest())
def validate_csrf_token(self, field):
if not field.data or '##' not in field.data:
raise ValidationError(field.gettext('CSRF token missing'))
expires, hmac_csrf = field.data.split('##')
check_val = (field.csrf_key + expires).encode('utf8')
hmac_compare = hmac.new(self.SECRET_KEY, check_val, digestmod=sha1)
if hmac_compare.hexdigest() != hmac_csrf:
raise ValidationError(field.gettext('CSRF failed'))
if self.TIME_LIMIT:
now_formatted = datetime.now().strftime(self.TIME_FORMAT)
if now_formatted > expires:
raise ValidationError(field.gettext('CSRF token expired'))
def validate_csrf_token(self, form, field):
meta = self.form_meta
if not field.data or '##' not in field.data:
raise ValidationError(field.gettext('CSRF token missing'))
expires, hmac_csrf = field.data.split('##', 1)
check_val = (self.session['csrf'] + expires).encode('utf8')
hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1)
if hmac_compare.hexdigest() != hmac_csrf:
raise ValidationError(field.gettext('CSRF failed'))
if self.time_limit:
now_formatted = self.now().strftime(self.TIME_FORMAT)
if now_formatted > expires:
raise ValidationError(field.gettext('CSRF token expired'))
def derive_key(self):
"""This method is called to derive the key. If you're unhappy with
the default key derivation choices you can override them here.
Keep in mind that the key derivation in itsdangerous is not intended
to be used as a security method to make a complex key out of a short
password. Instead you should use large random secret keys.
"""
salt = want_bytes(self.salt)
if self.key_derivation == 'concat':
return self.digest_method(salt + self.secret_key).digest()
elif self.key_derivation == 'django-concat':
return self.digest_method(salt + b'signer' +
self.secret_key).digest()
elif self.key_derivation == 'hmac':
mac = hmac.new(self.secret_key, digestmod=self.digest_method)
mac.update(salt)
return mac.digest()
elif self.key_derivation == 'none':
return self.secret_key
else:
raise TypeError('Unknown key derivation method')
def generate_csrf_token(self, csrf_context):
if self.SECRET_KEY is None:
raise Exception('must set SECRET_KEY in a subclass of this form for it to work')
if csrf_context is None:
raise TypeError('Must provide a session-like object as csrf context')
session = getattr(csrf_context, 'session', csrf_context)
if 'csrf' not in session:
session['csrf'] = sha1(os.urandom(64)).hexdigest()
self.csrf_token.csrf_key = session['csrf']
if self.TIME_LIMIT:
expires = (datetime.now() + self.TIME_LIMIT).strftime(self.TIME_FORMAT)
csrf_build = '%s%s' % (session['csrf'], expires)
else:
expires = ''
csrf_build = session['csrf']
hmac_csrf = hmac.new(self.SECRET_KEY, csrf_build.encode('utf8'), digestmod=sha1)
return '%s##%s' % (expires, hmac_csrf.hexdigest())
def validate_csrf_token(self, field):
if not field.data or '##' not in field.data:
raise ValidationError(field.gettext('CSRF token missing'))
expires, hmac_csrf = field.data.split('##')
check_val = (field.csrf_key + expires).encode('utf8')
hmac_compare = hmac.new(self.SECRET_KEY, check_val, digestmod=sha1)
if hmac_compare.hexdigest() != hmac_csrf:
raise ValidationError(field.gettext('CSRF failed'))
if self.TIME_LIMIT:
now_formatted = datetime.now().strftime(self.TIME_FORMAT)
if now_formatted > expires:
raise ValidationError(field.gettext('CSRF token expired'))
def generate_csrf_token(self, csrf_token_field):
meta = self.form_meta
if meta.csrf_secret is None:
raise Exception('must set `csrf_secret` on class Meta for SessionCSRF to work')
if meta.csrf_context is None:
raise TypeError('Must provide a session-like object as csrf context')
session = self.session
if 'csrf' not in session:
session['csrf'] = sha1(os.urandom(64)).hexdigest()
if self.time_limit:
expires = (self.now() + self.time_limit).strftime(self.TIME_FORMAT)
csrf_build = '%s%s' % (session['csrf'], expires)
else:
expires = ''
csrf_build = session['csrf']
hmac_csrf = hmac.new(meta.csrf_secret, csrf_build.encode('utf8'), digestmod=sha1)
return '%s##%s' % (expires, hmac_csrf.hexdigest())
def validate_csrf_token(self, form, field):
meta = self.form_meta
if not field.data or '##' not in field.data:
raise ValidationError(field.gettext('CSRF token missing'))
expires, hmac_csrf = field.data.split('##', 1)
check_val = (self.session['csrf'] + expires).encode('utf8')
hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1)
if hmac_compare.hexdigest() != hmac_csrf:
raise ValidationError(field.gettext('CSRF failed'))
if self.time_limit:
now_formatted = self.now().strftime(self.TIME_FORMAT)
if now_formatted > expires:
raise ValidationError(field.gettext('CSRF token expired'))
def derive_key(self):
"""This method is called to derive the key. If you're unhappy with
the default key derivation choices you can override them here.
Keep in mind that the key derivation in itsdangerous is not intended
to be used as a security method to make a complex key out of a short
password. Instead you should use large random secret keys.
"""
salt = want_bytes(self.salt)
if self.key_derivation == 'concat':
return self.digest_method(salt + self.secret_key).digest()
elif self.key_derivation == 'django-concat':
return self.digest_method(salt + b'signer' +
self.secret_key).digest()
elif self.key_derivation == 'hmac':
mac = hmac.new(self.secret_key, digestmod=self.digest_method)
mac.update(salt)
return mac.digest()
elif self.key_derivation == 'none':
return self.secret_key
else:
raise TypeError('Unknown key derivation method')