def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
python类md5()的实例源码
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def mangleData(self, data, index):
self.sha1_offset = 208
self.md5_offset = 256
self.header_offset = 360
self.filedata_offset = 3170
data = MangleFile.mangleData(self, data, index)
if USE_HACHOIR:
#data.tofile(open('/tmp/oops', 'wb'))
hachoir_config.quiet = True
data_str = data.tostring()
parser = guessParser(StringInputStream(data_str))
if parser:
self.useHachoirParser(parser)
summary_data = data[self.header_offset:].tostring()
checksum = md5(summary_data).digest()
data[self.md5_offset:self.md5_offset+16] = array('B', checksum)
summary_data = data[self.header_offset:self.filedata_offset].tostring()
checksum = sha(summary_data).hexdigest()
data[self.sha1_offset:self.sha1_offset+40] = array('B', checksum)
return data
def signRequestObject(self,
issuerDistinguishedName,
requestObject,
serialNumber,
secondsToExpiry=60 * 60 * 24 * 365, # One year
digestAlgorithm='md5'):
"""
Sign a CertificateRequest instance, returning a Certificate instance.
"""
req = requestObject.original
dn = requestObject.getSubject()
cert = crypto.X509()
issuerDistinguishedName._copyInto(cert.get_issuer())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(secondsToExpiry)
cert.set_serial_number(serialNumber)
cert.sign(self.original, digestAlgorithm)
return Certificate(cert)
def DigestCalcHA1(
pszAlg,
pszUserName,
pszRealm,
pszPassword,
pszNonce,
pszCNonce,
):
m = md5.md5()
m.update(pszUserName)
m.update(":")
m.update(pszRealm)
m.update(":")
m.update(pszPassword)
HA1 = m.digest()
if pszAlg == "md5-sess":
m = md5.md5()
m.update(HA1)
m.update(":")
m.update(pszNonce)
m.update(":")
m.update(pszCNonce)
HA1 = m.digest()
return HA1.encode('hex')
def _alg33_1(password, rev, keylen):
# 1. Pad or truncate the owner password string as described in step 1 of
# algorithm 3.2. If there is no owner password, use the user password
# instead.
password = (password + _encryption_padding)[:32]
# 2. Initialize the MD5 hash function and pass the result of step 1 as
# input to this function.
m = md5(password)
# 3. (Revision 3 or greater) Do the following 50 times: Take the output
# from the previous MD5 hash and pass it as input into a new MD5 hash.
md5_hash = m.digest()
if rev >= 3:
for i in range(50):
md5_hash = md5(md5_hash).digest()
# 4. Create an RC4 encryption key using the first n bytes of the output
# from the final MD5 hash, where n is always 5 for revision 2 but, for
# revision 3 or greater, depends on the value of the encryption
# dictionary's /Length entry.
key = md5_hash[:keylen]
return key
# Implementation of algorithm 3.4 of the PDF standard security handler,
# section 3.5.2 of the PDF 1.6 reference.
def _alg33_1(password, rev, keylen):
# 1. Pad or truncate the owner password string as described in step 1 of
# algorithm 3.2. If there is no owner password, use the user password
# instead.
password = b_((password + str_(_encryption_padding))[:32])
# 2. Initialize the MD5 hash function and pass the result of step 1 as
# input to this function.
m = md5(password)
# 3. (Revision 3 or greater) Do the following 50 times: Take the output
# from the previous MD5 hash and pass it as input into a new MD5 hash.
md5_hash = m.digest()
if rev >= 3:
for i in range(50):
md5_hash = md5(md5_hash).digest()
# 4. Create an RC4 encryption key using the first n bytes of the output
# from the final MD5 hash, where n is always 5 for revision 2 but, for
# revision 3 or greater, depends on the value of the encryption
# dictionary's /Length entry.
key = md5_hash[:keylen]
return key
# Implementation of algorithm 3.4 of the PDF standard security handler,
# section 3.5.2 of the PDF 1.6 reference.
def latex2html(node, source):
inline = isinstance(node.parent, nodes.TextElement)
latex = node['latex']
name = 'math-%s' % md5(latex).hexdigest()[-10:]
dest = '_static/%s.png' % name
depth = latex2png(latex, dest, node['fontset'])
path = '_static'
count = source.split('/doc/')[-1].count('/')
for i in range(count):
if os.path.exists(path): break
path = '../'+path
path = '../'+path #specifically added for matplotlib
if inline:
cls = ''
else:
cls = 'class="center" '
if inline and depth != 0:
style = 'style="position: relative; bottom: -%dpx"' % (depth + 1)
else:
style = ''
return '<img src="%s/%s.png" %s%s/>' % (path, name, cls, style)
def compute_encryption_key(self, password):
# Algorithm 3.2
password = (password + self.PASSWORD_PADDING)[:32] # 1
hash = md5.md5(password) # 2
hash.update(self.o) # 3
hash.update(struct.pack('<l', self.p)) # 4
hash.update(self.docid[0]) # 5
if self.r >= 4:
if not self.encrypt_metadata:
hash.update(b'\xff\xff\xff\xff')
result = hash.digest()
n = 5
if self.r >= 3:
n = self.length // 8
for _ in range(50):
result = md5.md5(result[:n]).digest()
return result[:n]
def authenticate_owner_password(self, password):
# Algorithm 3.7
password = (password + self.PASSWORD_PADDING)[:32]
hash = md5.md5(password)
if self.r >= 3:
for _ in range(50):
hash = md5.md5(hash.digest())
n = 5
if self.r >= 3:
n = self.length // 8
key = hash.digest()[:n]
if self.r == 2:
user_password = ARC4.new(key).decrypt(self.o)
else:
user_password = self.o
for i in range(19, -1, -1):
k = b''.join(chr(ord(c) ^ i) for c in key)
user_password = ARC4.new(k).decrypt(user_password)
return self.authenticate_user_password(user_password)
def __init__(self, tpl, page):
"""
:param tpl: a mwparserfromhell template: the original template
that we want to change
"""
self.template = tpl
self.orig_string = unicode(self.template)
r = md5.md5()
r.update(self.orig_string.encode('utf-8'))
self.orig_hash = r.hexdigest()
self.classification = None
self.conflicting_value = ''
self.proposed_change = ''
self.proposed_link = None
self.index = None
self.page = page
self.proposed_link_policy = None
self.issn = None
def signRequestObject(self,
issuerDistinguishedName,
requestObject,
serialNumber,
secondsToExpiry=60 * 60 * 24 * 365, # One year
digestAlgorithm='md5'):
"""
Sign a CertificateRequest instance, returning a Certificate instance.
"""
req = requestObject.original
dn = requestObject.getSubject()
cert = crypto.X509()
issuerDistinguishedName._copyInto(cert.get_issuer())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(secondsToExpiry)
cert.set_serial_number(serialNumber)
cert.sign(self.original, digestAlgorithm)
return Certificate(cert)
def DigestCalcHA1(
pszAlg,
pszUserName,
pszRealm,
pszPassword,
pszNonce,
pszCNonce,
):
m = md5.md5()
m.update(pszUserName)
m.update(":")
m.update(pszRealm)
m.update(":")
m.update(pszPassword)
HA1 = m.digest()
if pszAlg == "md5-sess":
m = md5.md5()
m.update(HA1)
m.update(":")
m.update(pszNonce)
m.update(":")
m.update(pszCNonce)
HA1 = m.digest()
return HA1.encode('hex')
def _audit_sniff(self, url, head, data):
__opid = Scan_ThreadLocal.__pid
for OO0O0 in self._sniff_plugins:
Day_time_1 = time.time() * 1000
try:
Scan_ThreadLocal.__pid = OO0O0
self._sniff_plugins[OO0O0].audit(url, head, data)
except:
Logging_Obj.exception('[M:%d] %s' % (OO0O0, repr(url)))
Day_time_2 = time.time() * 1000
Mysql_table_insert('auditlog', uuid=md5.md5(url).hexdigest(), plugin_id=OO0O0, type=1, arg=repr(url),
time=int(Day_time_2 - Day_time_1))
Scan_ThreadLocal.__pid = __opid
def _check_final_md5(self, key, etag):
"""
Checks that etag from server agrees with md5 computed before upload.
This is important, since the upload could have spanned a number of
hours and multiple processes (e.g., gsutil runs), and the user could
change some of the file and not realize they have inconsistent data.
"""
if key.bucket.connection.debug >= 1:
print 'Checking md5 against etag.'
if key.md5 != etag.strip('"\''):
# Call key.open_read() before attempting to delete the
# (incorrect-content) key, so we perform that request on a
# different HTTP connection. This is neededb because httplib
# will return a "Response not ready" error if you try to perform
# a second transaction on the connection.
key.open_read()
key.close()
key.delete()
raise ResumableUploadException(
'File changed during upload: md5 signature doesn\'t match etag '
'(incorrect uploaded object deleted)',
ResumableTransferDisposition.ABORT)
def H(self, data):
return md5.md5(data).hexdigest()
def _createNonce(self):
return md5.md5("%d:%s" % (time.time(), self.realm)).hexdigest()
def hashPassword(password, salt):
p1 = md5.md5(password).hexdigest() + '.' + salt.strip()
hashedpass = md5.md5(p1).hexdigest()
return hashedpass
def h_file(fname):
"""
Compute a hash value for a file by using md5. This method may be replaced by
a faster version if necessary. The following uses the file size and the timestamp value::
import stat
from waflib import Utils
def h_file(fname):
st = os.stat(fname)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(fname)
return m.digest()
Utils.h_file = h_file
:type fname: string
:param fname: path to the file to hash
:return: hash of the file contents
"""
f = open(fname, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
def h_list(lst):
"""
Hash lists. For tuples, using hash(tup) is much more efficient,
except on python >= 3.3 where hash randomization assumes everybody is running a web application.
:param lst: list to hash
:type lst: list of strings
:return: hash of the list
"""
m = md5()
m.update(str(lst).encode())
return m.digest()
def h_file(fname):
"""
Compute a hash value for a file by using md5. This method may be replaced by
a faster version if necessary. The following uses the file size and the timestamp value::
import stat
from waflib import Utils
def h_file(fname):
st = os.stat(fname)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(fname)
return m.digest()
Utils.h_file = h_file
:type fname: string
:param fname: path to the file to hash
:return: hash of the file contents
"""
f = open(fname, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
def h_list(lst):
"""
Hash lists. For tuples, using hash(tup) is much more efficient,
except on python >= 3.3 where hash randomization assumes everybody is running a web application.
:param lst: list to hash
:type lst: list of strings
:return: hash of the list
"""
m = md5()
m.update(str(lst).encode())
return m.digest()
def signature(self):
try:
from hashlib import md5
except ImportError:
from md5 import md5
try:
sig = md5()
if self.start:
sig.update(self.start.encode('latin-1'))
if self.prec:
sig.update("".join(["".join(p) for p in self.prec]).encode('latin-1'))
if self.tokens:
sig.update(" ".join(self.tokens).encode('latin-1'))
for f in self.pfuncs:
if f[3]:
sig.update(f[3].encode('latin-1'))
except (TypeError,ValueError):
pass
return sig.digest()
# -----------------------------------------------------------------------------
# validate_file()
#
# This method checks to see if there are duplicated p_rulename() functions
# in the parser module file. Without this function, it is really easy for
# users to make mistakes by cutting and pasting code fragments (and it's a real
# bugger to try and figure out why the resulting parser doesn't work). Therefore,
# we just do a little regular expression pattern matching of def statements
# to try and detect duplicates.
# -----------------------------------------------------------------------------
def signature(self):
try:
from hashlib import md5
except ImportError:
from md5 import md5
try:
sig = md5()
if self.start:
sig.update(self.start.encode('latin-1'))
if self.prec:
sig.update("".join(["".join(p) for p in self.prec]).encode('latin-1'))
if self.tokens:
sig.update(" ".join(self.tokens).encode('latin-1'))
for f in self.pfuncs:
if f[3]:
sig.update(f[3].encode('latin-1'))
except (TypeError,ValueError):
pass
return sig.digest()
# -----------------------------------------------------------------------------
# validate_file()
#
# This method checks to see if there are duplicated p_rulename() functions
# in the parser module file. Without this function, it is really easy for
# users to make mistakes by cutting and pasting code fragments (and it's a real
# bugger to try and figure out why the resulting parser doesn't work). Therefore,
# we just do a little regular expression pattern matching of def statements
# to try and detect duplicates.
# -----------------------------------------------------------------------------
def download_setuptools(packagename, to_dir):
# making sure we use the absolute path
to_dir = os.path.abspath(to_dir)
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
chksum, url = get_pypi_src_download(packagename)
tgz_name = os.path.basename(url)
saveto = os.path.join(to_dir, tgz_name)
src = dst = None
if not os.path.exists(saveto): # Avoid repeated downloads
try:
log.warn("Downloading %s", url)
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
if chksum is not None:
data_sum = md5(data).hexdigest()
if data_sum != chksum:
raise RuntimeError("Downloading %s failed: corrupt checksum"%(url,))
dst = open(saveto, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
return os.path.realpath(saveto)
def digest(self, method='md5'):
"""
Return a digest hash of this certificate using the specified hash
algorithm.
@param method: One of C{'md5'} or C{'sha'}.
@rtype: C{str}
"""
return self.original.digest(method)
def certificateRequest(self, format=crypto.FILETYPE_ASN1,
digestAlgorithm='md5'):
return self.privateKey.certificateRequest(
self.getSubject(),
format,
digestAlgorithm)
def signRequestObject(self, certificateRequest, serialNumber,
secondsToExpiry=60 * 60 * 24 * 365, # One year
digestAlgorithm='md5'):
return self.privateKey.signRequestObject(self.getSubject(),
certificateRequest,
serialNumber,
secondsToExpiry,
digestAlgorithm)
def keyHash(self):
"""
MD5 hex digest of signature on an empty certificate request with this
key.
"""
return md5.md5(self._emptyReq).hexdigest()