def check(self, data, digest):
# Check digest matches the expected value
obj = sha.new(data)
computed = obj.hexdigest()
self.assertTrue(computed == digest)
# Verify that the value doesn't change between two consecutive
# digest operations.
computed_again = obj.hexdigest()
self.assertTrue(computed == computed_again)
# Check hexdigest() output matches digest()'s output
digest = obj.digest()
hexd = ""
for c in digest:
hexd += '%02x' % ord(c)
self.assertTrue(computed == hexd)
python类new()的实例源码
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if AES is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 3.3.1.1
aesKey, iv, salt = self.__getEncryptionKey(
encryptKey, snmpEngineBoots, snmpEngineTime
)
# 3.3.1.3
aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128)
# PyCrypto seems to require padding
dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets()
ciphertext = aesObj.encrypt(dataToEncrypt)
# 3.3.1.4
return univ.OctetString(ciphertext), univ.OctetString(salt)
# 3.2.4.2
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 8.3.1.1
desKey, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
# 8.3.1.2
privParameters = univ.OctetString(salt)
# 8.1.1.2
desObj = DES.new(desKey, DES.MODE_CBC, iv)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = desObj.encrypt(plaintext)
# 8.3.1.3 & 4
return univ.OctetString(ciphertext), privParameters
# 8.2.4.2
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES3 is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
des3Key, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv)
privParameters = univ.OctetString(salt)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = des3Obj.encrypt(plaintext)
return univ.OctetString(ciphertext), privParameters
# 5.1.1.3
def _build_token(self):
nonce = str(uuid.uuid4())
base64nonce = binascii.b2a_base64(binascii.a2b_qp(nonce))
created_date = datetime.utcnow().isoformat() + 'Z'
sha_object = sha.new(nonce + created_date + self.secret)
password_64 = binascii.b2a_base64(sha_object.digest())
properties = {
"Username": self.username,
"PasswordDigest": password_64.strip(),
"Nonce": base64nonce.strip(),
"Created": created_date,
}
header = 'UsernameToken ' + self._serialize_header(properties)
return {'X-WSSE': header}
def _build_token(self):
nonce = str(uuid.uuid4())
base64nonce = binascii.b2a_base64(binascii.a2b_qp(nonce))
created_date = datetime.utcnow().isoformat() + 'Z'
sha_object = sha.new(nonce + created_date + self.secret)
password_64 = binascii.b2a_base64(sha_object.digest())
properties = {
"Username": self.username,
"PasswordDigest": password_64.strip(),
"Nonce": base64nonce.strip(),
"Created": created_date,
}
header = 'UsernameToken ' + self._serialize_header(properties)
return {'X-WSSE': header}
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = bytearray(b'\0' * self.hash_len)
if seed is not None:
self.stir(bytearray(seed))
self.seeded = True
else:
self.seeded = False
def moveTo(self, destination):
try:
os.rename(self.path, destination.path)
self.restat(False)
except OSError, ose:
if ose.errno == errno.EXDEV:
# man 2 rename, ubuntu linux 5.10 "breezy":
# oldpath and newpath are not on the same mounted filesystem.
# (Linux permits a filesystem to be mounted at multiple
# points, but rename(2) does not work across different mount
# points, even if the same filesystem is mounted on both.)
# that means it's time to copy trees of directories!
secsib = destination.temporarySibling()
self.copyTo(secsib) # slow
secsib.moveTo(destination) # visible
# done creating new stuff. let's clean me up.
mysecsib = self.temporarySibling()
self.moveTo(mysecsib) # visible
mysecsib.remove() # slow
else:
raise
def testAuth(self):
self.authComplete = False
outlist = []
ca = ConnectComponentAuthenticator("cjid", "secret")
xs = xmlstream.XmlStream(ca)
xs.transport = DummyTransport(outlist)
xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
self.authPassed)
# Go...
xs.connectionMade()
xs.dataReceived("<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' from='cjid' id='12345'>")
# Calculate what we expect the handshake value to be
hv = sha.new("%s%s" % ("12345", "secret")).hexdigest()
self.assertEquals(outlist[1], "<handshake>%s</handshake>" % (hv))
xs.dataReceived("<handshake/>")
self.assertEquals(self.authComplete, True)
def _continueGEX_GROUP(self, ignored, pubKey, f, signature):
serverKey = keys.getPublicKeyObject(pubKey)
sharedSecret = _MPpow(f, self.x, DH_PRIME)
h = sha.new()
h.update(NS(self.ourVersionString))
h.update(NS(self.otherVersionString))
h.update(NS(self.ourKexInitPayload))
h.update(NS(self.serverKexInitPayload))
h.update(NS(pubKey))
h.update(MP(self.DHpubKey))
h.update(MP(f))
h.update(sharedSecret)
exchangeHash = h.digest()
if not keys.verifySignature(serverKey, signature, exchangeHash):
self.sendDisconnect(DISCONNECT_KEY_EXCHANGE_FAILED, 'bad signature')
return
self._keySetup(sharedSecret, exchangeHash)
def _continueGEX_REPLY(self, ignored, pubKey, f, signature):
serverKey = keys.getPublicKeyObject(pubKey)
sharedSecret = _MPpow(f, self.x, self.p)
h = sha.new()
h.update(NS(self.ourVersionString))
h.update(NS(self.otherVersionString))
h.update(NS(self.ourKexInitPayload))
h.update(NS(self.serverKexInitPayload))
h.update(NS(pubKey))
h.update('\x00\x00\x08\x00')
h.update(MP(self.p))
h.update(MP(self.g))
h.update(MP(self.DHpubKey))
h.update(MP(f))
h.update(sharedSecret)
exchangeHash = h.digest()
if not keys.verifySignature(serverKey, signature, exchangeHash):
self.sendDisconnect(DISCONNECT_KEY_EXCHANGE_FAILED, 'bad signature')
return
self._keySetup(sharedSecret, exchangeHash)
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = '\0' * self.hash_len
if not seed is None:
self.stir(seed)
self.seeded = True
else:
self.seeded = False
def _SetDefaultsFromSchema(self):
"""Assign object default values according to the schema. This will not
overwrite properties that have already been set."""
defaults = {}
for property, attributes in self._schema.iteritems():
(is_list, property_type, is_strong, is_required) = attributes[0:4]
if is_required and len(attributes) >= 5 and \
not property in self._properties:
default = attributes[4]
defaults[property] = default
if len(defaults) > 0:
# Use do_copy=True so that each new object gets its own copy of strong
# objects, lists, and dicts.
self.UpdateProperties(defaults, do_copy=True)
def AddFile(self, path, settings=None):
(file_group, hierarchical) = self.FileGroup(path)
file_ref = file_group.AddOrGetFileByPath(path, hierarchical)
if file_ref in self._files_by_xcfilelikeelement and \
isinstance(file_ref, PBXVariantGroup):
# There's already a PBXBuildFile in this phase corresponding to the
# PBXVariantGroup. path just provides a new variant that belongs to
# the group. Add the path to the dict.
pbxbuildfile = self._files_by_xcfilelikeelement[file_ref]
self._AddBuildFileToDicts(pbxbuildfile, path)
else:
# Add a new PBXBuildFile to get file_ref into the phase.
if settings is None:
pbxbuildfile = PBXBuildFile({'fileRef': file_ref})
else:
pbxbuildfile = PBXBuildFile({'fileRef': file_ref, 'settings': settings})
self.AppendBuildFile(pbxbuildfile, path)
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried, for
example Digest may return stale=true.
"""
return False
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
if self.challenge['algorithm'] == 'HMAC-MD5':
self.hashmod = _md5
else:
self.hashmod = _sha
if self.challenge['pw-algorithm'] == 'MD5':
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
self.key = "".join([self.credentials[0], ":",
self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
":", self.challenge['realm']])
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
os.makedirs(self.cache)