def _authenticate_cram_md5(credentials, sock_info):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source = credentials.source
username = credentials.username
password = credentials.password
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b'')),
('autoAuthorize', 1)])
response = sock_info.command(source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=md5)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b' ' + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
sock_info.command(source, cmd)
python类md5()的实例源码
def create_checksum(self):
"""
Creating a checksums of checksums to detect a change
even if there are multiple files used to configure a service.
Cleanup old checksumfiles if the config was removed.
"""
checksums = ''
for _file in self.service_conf_files:
if os.path.exists(_file):
log.debug("Generating checksum for {}".format(_file))
md5 = hashlib.md5(open(_file, 'rb').read()).hexdigest()
log.debug("Checksum: {}".format(md5))
checksums += md5
if checksums:
return hashlib.md5(checksums).hexdigest()
log.debug(("No file found to generate a checksum from. Looked for "
"{}".format(self.service_conf_files)))
if os.path.exists(self.checksum_file):
os.remove(self.checksum_file)
return None
def solution_part2():
result = 8 * ['']
assigned = 0
start_int = 0
digits = {'0', '1', '2', '3', '4', '5', '6', '7'}
while assigned < 8:
hexhash = md5(('ugkcyxxp' + str(start_int)).encode('ascii')).hexdigest()
as_str = str(hexhash)
if as_str.startswith('00000') and as_str[5] in digits:
idx = int(as_str[5])
if not result[idx]:
result[idx] = as_str[6]
assigned = len([1 for i in result if i])
print(result)
start_int += 1
return ''.join(result)
def createVideoIcon(file):
"""
Creating a video icon/thumbnail
:type file: str
:param file: path to the file name
:rtype: image
:return: icon/thumbnail for the video
"""
# should install ffmpeg for the method to work successfully
ffmpeg = Utils.checkFFMPEG()
if ffmpeg:
# generate thumbnail
preview = os.path.join(mkdtemp(), hashlib.md5(file) + '.jpg')
try:
os.unlink(preview)
except Exception:
pass
# capture video preview
command = [ffmpeg, '-i', file, '-f', 'mjpeg', '-ss', '00:00:01', '-vframes', '1', preview, '2>&1']
exec_php(command)
return file_get_contents(preview)
def ExpandBitly(self, text):
if os.environ['HTTP_HOST'].find('localhost') == -1:
p = re.compile('http:\/\/bit\.ly/[a-zA-Z0-9]+')
m = p.findall(text)
if len(m) > 0:
api = bitly.Api(login='livid', apikey='R_40ab00809faf431d53cfdacc8d8b8d7f')
last = None
for s in m:
if s != last:
cache_tag = 'bitly_' + hashlib.md5(s).hexdigest()
expanded = memcache.get(cache_tag)
if expanded is None:
expanded = api.expand(s)
memcache.set(cache_tag, expanded, 2678400)
last = s
text = text.replace(s, expanded)
return text
def gravatar(value,arg):
default = "http://v2ex.appspot.com/static/img/avatar_" + str(arg) + ".png"
if type(value).__name__ != 'Member':
return '<img src="' + default + '" border="0" align="absmiddle" />'
if arg == 'large':
number_size = 73
member_avatar_url = value.avatar_large_url
elif arg == 'normal':
number_size = 48
member_avatar_url = value.avatar_normal_url
elif arg == 'mini':
number_size = 24
member_avatar_url = value.avatar_mini_url
if member_avatar_url:
return '<img src="'+ member_avatar_url +'" border="0" alt="' + value.username + '" />'
else:
gravatar_url = "http://www.gravatar.com/avatar/" + hashlib.md5(value.email.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'s' : str(number_size), 'd' : default})
return '<img src="' + gravatar_url + '" border="0" alt="' + value.username + '" align="absmiddle" />'
# avatar filter
def __init__(self, method):
super(auth_chain_a, self).__init__(method)
self.hashfunc = hashlib.md5
self.recv_buf = b''
self.unit_len = 2800
self.raw_trans = False
self.has_sent_header = False
self.has_recv_header = False
self.client_id = 0
self.connection_id = 0
self.max_time_dif = 60 * 60 * 24 # time dif (second) setting
self.salt = b"auth_chain_a"
self.no_compatible_method = 'auth_chain_a'
self.pack_id = 1
self.recv_id = 1
self.user_id = None
self.user_id_num = 0
self.user_key = None
self.overhead = 4
self.client_over_head = 4
self.last_client_hash = b''
self.last_server_hash = b''
self.random_client = xorshift128plus()
self.random_server = xorshift128plus()
self.encryptor = None
def EVP_BytesToKey(password, key_len, iv_len):
# equivalent to OpenSSL's EVP_BytesToKey() with count 1
# so that we make the same key and iv as nodejs version
if hasattr(password, 'encode'):
password = password.encode('utf-8')
cached_key = '%s-%d-%d' % (password, key_len, iv_len)
r = cached_keys.get(cached_key, None)
if r:
return r
m = []
i = 0
while len(b''.join(m)) < (key_len + iv_len):
md5 = hashlib.md5()
data = password
if i > 0:
data = m[i - 1] + password
md5.update(data)
m.append(md5.digest())
i += 1
ms = b''.join(m)
key = ms[:key_len]
iv = ms[key_len:key_len + iv_len]
cached_keys[cached_key] = (key, iv)
return key, iv
def post(self):
#
# ???????
# ????session
#
password = self.get_body_argument('password', '')
if password:
user = yield self.db.user.find_one({
'username': self.current_user['username']
})
_ = md5(password + self.settings['salt'])
if user['password'] == _.hexdigest():
if self.get_cookie('TORNADOSESSION'):
self.clear_cookie('TORNADOSESSION')
self.db.user.remove({
'_id': ObjectId(self.current_user['_id'])
})
self.session.delete('user_session')
self.redirect('/')
self.custom_error('????????')
def user_action(self, *args, **kwargs):
uid = self.get_body_argument('uid')
user = {
'email': self.get_body_argument('email'),
'website': self.get_body_argument('website'),
'qq': self.get_body_argument('qq'),
'address': self.get_body_argument('address')
}
# model ??
#
#
password = self.get_body_argument('password', '')
# password ?????
if password:
user['password'] = md5(password, self.settings['salt'])
user = yield self.db.user.find_and_modify({
'_id': ObjectId(uid)
}, {
'$set': user
})
self.redirect('/manage/userdetail/{}'.format(uid))
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def h_file_win32(fname):
try:
fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
except OSError:
raise IOError('Cannot read from %r' % fname)
f = os.fdopen(fd, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
# always save these
def file_already_exists(self):
"""Check to see if there is a remote file with the same
name and md5 sum.
Returns:
``True`` if exists, ``False`` otherwise.
"""
if not self.remote_dir_exists:
return False
try:
dst_hash = self._get_remote_md5()
except NCError:
return False
src_hash = self._get_local_md5()
if src_hash == dst_hash:
return True
return False
def _get_remote_md5(self):
"""Return the md5 sum of the remote file,
if it exists.
"""
E = action_element_maker()
top = E.top(
E.FileSystem(
E.Files(
E.File(
E.SrcName(self.dst),
E.Operations(
E.md5sum()
)
)
)
)
)
nc_get_reply = self.device.action(top)
reply_ele = etree.fromstring(nc_get_reply.xml)
md5sum = find_in_action('md5sum', reply_ele)
if md5sum is not None:
return md5sum.text.strip()
def get_md5(url):
#str??unicode?
if isinstance(url, str):
url = url.encode("utf-8")
m = hashlib.md5()
m.update(url)
return m.hexdigest()
def md5(self):
return hashlib.md5(self.stream).hexdigest();
def run(self):
searchhash = ''
if self.data_type == 'hash':
searchhash = self.getData()
if len(searchhash) != 32:
self.report({'isonvs': 'unknown',
'hash': searchhash})
elif self.data_type == 'file':
filepath = self.getParam('file')
hasher = hashlib.md5()
with io.open(filepath, mode='rb') as afile:
for chunk in iter(lambda: afile.read(65536), b''):
hasher.update(chunk)
searchhash = hasher.hexdigest()
else:
self.error('Unsupported data type.')
# Read files
for file in self.filelist:
filepath = os.path.join(self.path, file)
if not os.path.isfile(filepath):
continue
with io.open(filepath, 'r') as afile:
for line in afile:
# Skipping comments
if line[0] == '#':
continue
if searchhash.lower() in line:
self.report({'isonvs': True,
'md5': searchhash})
self.report({'isonvs': False,
'md5': searchhash})
def getTempfileChecksum(self):
with open(self.tempfile, 'rb') as fileHandler:
checksum = hashlib.md5()
while True:
data = fileHandler.read(8192)
if not data:
break
checksum.update(data)
return checksum.hexdigest()
def md5(self):
"""
Simply return the md5 hash value associated with the content file.
If the file can't be accessed, then None is returned.
"""
md5 = hashlib.md5()
if self.open(mode=NNTPFileMode.BINARY_RO):
for chunk in \
iter(lambda: self.stream.read(128*md5.block_size), b''):
md5.update(chunk)
return md5.hexdigest()
return None