def _hi(data, salt, iterations):
"""A simple implementation of PBKDF2."""
mac = hmac.HMAC(data, None, sha1)
def _digest(msg, mac=mac):
"""Get a digest for msg."""
_mac = mac.copy()
_mac.update(msg)
return _mac.digest()
from_bytes = _from_bytes
to_bytes = _to_bytes
_u1 = _digest(salt + b'\x00\x00\x00\x01')
_ui = from_bytes(_u1, 'big')
for _ in range(iterations - 1):
_u1 = _digest(_u1)
_ui ^= from_bytes(_u1, 'big')
return to_bytes(_ui, 20, 'big')
python类sha1()的实例源码
def mime_multipart(_, context, arg):
"""mime_multipart composes a MIME multipart string.
Example:
"UserData": {
"Fn::Base64": {
"CFPP::MimeMultipart": [
["text/x-shellscript", {"CFPP::FileToString": "userdata.sh"}],
["text/cloud-config", {"CFPP::FileToString": "cloud-init.yaml"}]
]
}
}
"""
_raise_unless_mime_params(context, arg)
mime_doc = MIMEMultipart()
# set boundary explicitly so that they are stable based on path in the template.
mime_doc.set_boundary("=" * 10 + hashlib.sha1(".".join(context)).hexdigest() + "=" * 3)
for mime_type, contents in arg:
sub_message = MIMEText(contents, contents, sys.getdefaultencoding())
mime_doc.attach(sub_message)
return mime_doc.as_string()
def pbkdf2_hex(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS,
keylen=None, hashfunc=None):
"""Like :func:`pbkdf2_bin`, but returns a hex-encoded string.
.. versionadded:: 0.9
:param data: the data to derive.
:param salt: the salt for the derivation.
:param iterations: the number of iterations.
:param keylen: the length of the resulting key. If not provided,
the digest size will be used.
:param hashfunc: the hash function to use. This can either be the
string name of a known hash function, or a function
from the hashlib module. Defaults to sha1.
"""
rv = pbkdf2_bin(data, salt, iterations, keylen, hashfunc)
return to_native(codecs.encode(rv, 'hex_codec'))
def get(self, key):
member = CheckAuth(self)
t = self.request.arguments['t'][0]
if member:
if (member.level == 0) and (str(member.created_ts) == str(t)):
one = Member.get(key)
if one:
if one.num != 1:
memcache.delete(one.auth)
one.deactivated = int(time.time())
one.password = hashlib.sha1(str(time.time())).hexdigest()
one.auth = hashlib.sha1(str(one.num) + ':' + one.password).hexdigest()
one.newbie = 1
one.noob = 1
one.sync()
store.commit() #jon add
memcache.delete('Member_' + str(one.num))
return self.redirect('/member/' + one.username)
return self.redirect('/')
def get_disqus_sso_payload(user):
"""Return remote_auth_s3 and api_key for user."""
DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
if user:
data = simplejson.dumps({
'id': user.id,
'username': user.name,
'email': user.email_addr,
})
else:
data = simplejson.dumps({})
# encode the data to base64
message = base64.b64encode(data)
# generate a timestamp for signing the message
timestamp = int(time.time())
# generate our hmac signature
sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
hashlib.sha1).hexdigest()
return message, timestamp, sig, DISQUS_PUBLIC_KEY
else:
return None, None, None, None
def test_disqus_sso_payload_auth_user(self, mock_b64encode, mock_hmac):
"""Test Disqus SSO payload auth works."""
user = UserFactory.create()
DISQUS_PUBLIC_KEY = 'public'
DISQUS_SECRET_KEY = 'secret'
patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY,
'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY}
data = json.dumps({'id': user.id,
'username': user.name,
'email': user.email_addr})
mock_b64encode.return_value = data
with patch.dict(self.flask_app.config, patch_dict):
message, timestamp, sig, pub_key = util.get_disqus_sso_payload(user)
mock_b64encode.assert_called_with(data)
mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp),
hashlib.sha1)
assert timestamp
assert sig
assert pub_key == DISQUS_PUBLIC_KEY
def test_disqus_sso_payload_anon_user(self, mock_b64encode, mock_hmac):
"""Test Disqus SSO payload anon works."""
DISQUS_PUBLIC_KEY = 'public'
DISQUS_SECRET_KEY = 'secret'
patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY,
'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY}
data = json.dumps({})
mock_b64encode.return_value = data
with patch.dict(self.flask_app.config, patch_dict):
message, timestamp, sig, pub_key = util.get_disqus_sso_payload(None)
mock_b64encode.assert_called_with(data)
mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp),
hashlib.sha1)
assert timestamp
assert sig
assert pub_key == DISQUS_PUBLIC_KEY
def pbkdf2_hex(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS,
keylen=None, hashfunc=None):
"""Like :func:`pbkdf2_bin`, but returns a hex-encoded string.
.. versionadded:: 0.9
:param data: the data to derive.
:param salt: the salt for the derivation.
:param iterations: the number of iterations.
:param keylen: the length of the resulting key. If not provided,
the digest size will be used.
:param hashfunc: the hash function to use. This can either be the
string name of a known hash function, or a function
from the hashlib module. Defaults to sha1.
"""
rv = pbkdf2_bin(data, salt, iterations, keylen, hashfunc)
return to_native(codecs.encode(rv, 'hex_codec'))
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
See http://oauth.net/core/1.0/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(consumer_token["secret"])]
key_elems.append(escape.utf8(token["secret"] if token else ""))
key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _key_identifier_from_public_key(public_key):
# This is a very slow way to do this.
serialized = public_key.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
spki, remaining = decoder.decode(
serialized, asn1Spec=_SubjectPublicKeyInfo()
)
assert not remaining
# the univ.BitString object is a tuple of bits. We need bytes and
# pyasn1 really doesn't want to give them to us. To get it we'll
# build an integer and convert that to bytes.
bits = 0
for bit in spki.getComponentByName("subjectPublicKey"):
bits = bits << 1 | bit
data = utils.int_to_bytes(bits)
return hashlib.sha1(data).digest()
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _key_identifier_from_public_key(public_key):
# This is a very slow way to do this.
serialized = public_key.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
spki, remaining = decoder.decode(
serialized, asn1Spec=_SubjectPublicKeyInfo()
)
assert not remaining
# the univ.BitString object is a tuple of bits. We need bytes and
# pyasn1 really doesn't want to give them to us. To get it we'll
# build an integer and convert that to bytes.
bits = 0
for bit in spki.getComponentByName("subjectPublicKey"):
bits = bits << 1 | bit
data = utils.int_to_bytes(bits)
return hashlib.sha1(data).digest()
def show_group(group_num):
""" Returns info about group `group_num` using VTPM_ORD_GROUP_SHOW"""
out = {'num': group_num, 'vtpms': []}
body = vtpm_raw(0x1C2, struct.pack('>II', 0x02000107, group_num))
(uuid, pk, cfg) = struct.unpack('16s 256s 16s', body)
uuid = stringify_uuid(uuid)
logger.info('Group [%d] UUID: %s', group_num, uuid)
pk_hash = hashlib.sha1(pk).hexdigest()
logger.info(' PK Hash: %s', pk_hash)
logger.info(' Cfg list: %s', cfg.encode('hex'))
body = vtpm_cmd(VTPM_ORD_VTPM_LIST, struct.pack('>II', group_num, 0))
((num_vtpms,), body) = unpack('>I', body)
if num_vtpms > 0:
logger.info(' vTPMs: ')
vtpms = struct.unpack('16s' * num_vtpms, body)
vtpms = [stringify_uuid(vtpm) for vtpm in vtpms]
for i, vtpm in enumerate(vtpms):
logger.info(' [%d]: %s', i, vtpm)
out['vtpms'].append(vtpm)
out['uuid'] = uuid
return out
def test_collision():
import resource
resource.setrlimit(resource.RLIMIT_NOFILE, (102400, 102400))
dd = {}
ls = []
for i in range(1 << 15):
key = str(hashlib.sha1(str(i)).hexdigest())
lck = key
print 'lock is', i, lck
l = Portlock(lck, timeout=8)
r = l.try_lock()
if not r:
print 'collide', i, l.addr
print l.socks
dd[l.addr] = i
ls.append(l)
def collect_cancel(html):
try:
cancels_list = []
for tr in html.findAll('tr', attrs={'class': re.compile('^gen_')}):
td = tr.findAll('td')
# ????
lec_cancel = map(text, td[2:9])
# ?????????
s = lec_cancel[0] + lec_cancel[1] + lec_cancel[2] + \
lec_cancel[3] + lec_cancel[4] + lec_cancel[6]
unique_hash = hashlib.sha1(s.encode('utf-8')).hexdigest()
lec_cancel.append(unique_hash)
cancels_list.append(lec_cancel)
else:
return cancels_list
except Exception as e:
log.exception(e)
def collect_news(html):
try:
news_list = []
now_notice = html.find('div', attrs={'id': 'now_notice_area'})
for tr in now_notice.findAll('tr'):
td = tr.findAll('td')
news = map(text, td[0:2])
# ???link????????
links = []
if td[1].a is not None:
for a in td[1].findAll('a'):
link = a.get('href')
links.append(link)
urls = " ".join(links)
# URL???
news.append(urls)
# ?????????
s = news[0] + news[1]
unique_hash = hashlib.sha1(s.encode('utf-8')).hexdigest()
news.append(unique_hash)
news_list.append(news)
else:
return news_list
except Exception as e:
log.exception(e)
def _acquaint_(self, peer_location, peer_signature, addrinfo, task=None):
"""
Internal use only.
"""
if peer_signature in _Peer._sign_locations:
raise StopIteration
sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
keyfile=self._keyfile, certfile=self._certfile)
sock.settimeout(MsgTimeout)
req = _NetRequest('peer', kwargs={'signature': self._signature, 'name': self._name,
'from': addrinfo.location, 'version': __version__},
dst=peer_location)
req.auth = hashlib.sha1((peer_signature + self._secret).encode()).hexdigest()
try:
yield sock.connect((peer_location.addr, peer_location.port))
yield sock.send_msg(serialize(req))
peer_info = yield sock.recv_msg()
peer_info = deserialize(peer_info)
assert peer_info['version'] == __version__
if peer_signature not in _Peer._sign_locations:
_Peer(peer_info['name'], peer_location, peer_signature,
self._keyfile, self._certfile, addrinfo)
reply = 0
except:
logger.debug(traceback.format_exc())
reply = -1
sock.close()
raise StopIteration(0)
def _udp_proc(self, location, addrinfo, task=None):
"""
Internal use only.
"""
task.set_daemon()
sock = addrinfo.udp_sock
while 1:
msg, addr = yield sock.recvfrom(1024)
if not msg.startswith('ping:'):
logger.warning('ignoring UDP message from %s:%s', addr[0], addr[1])
continue
try:
ping_info = deserialize(msg[len('ping:'):])
except:
continue
peer_location = ping_info.get('location', None)
if not isinstance(peer_location, Location) or peer_location in self._locations:
continue
if ping_info['version'] != __version__:
logger.warning('Peer %s version %s is not %s',
peer_location, ping_info['version'], __version__)
continue
if self._ignore_peers:
continue
if self._secret is None:
auth_code = None
else:
auth_code = ping_info.get('signature', '') + self._secret
auth_code = hashlib.sha1(auth_code.encode()).hexdigest()
_Peer._lock.acquire()
peer = _Peer.peers.get((peer_location.addr, peer_location.port), None)
_Peer._lock.release()
if peer and peer.auth != auth_code:
_Peer.remove(peer_location)
peer = None
if not peer:
SysTask(self._acquaint_, peer_location, ping_info['signature'], addrinfo)