def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
python类new()的实例源码
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def audit(arg):
md5_list = [
'3a1c6cc728dddc258091a601f28a9c12',
'53fef78841c3fae1ee992ae324a51620',
'4c2fc69dc91c885837ce55d03493a5f5',
]
code, head, res, err, _ = curl.curl2(arg)
if code == 200:
md5_value = md5.new(res).hexdigest()
if md5_value in md5_list:
security_warning(arg + '?movieName=%22]%29}catch%28e%29{if%28!window.x%29{window.x=1;alert%28document.cookie%29}}// flash xss')
else:
#debug(arg + ' **_**' + md5_value)
pass
else:
#debug(arg + '**__**not found')
pass
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried, for
example Digest may return stale=true.
"""
return False
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
if self.challenge['algorithm'] == 'HMAC-MD5':
self.hashmod = _md5
else:
self.hashmod = _sha
if self.challenge['pw-algorithm'] == 'MD5':
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
self.key = "".join([self.credentials[0], ":",
self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
":", self.challenge['realm']])
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
os.makedirs(self.cache)
def baidufanyi(strin):
appId = '*???*'
secretKey = '*???*'
fromLang = 'jp'
toLang = 'zh'
salt = random.randint(32768, 65536)
sign = appId+strin+str(salt)+secretKey
m1 = md5.new()
m1.update(sign)
sign = m1.hexdigest()
url = "http://api.fanyi.baidu.com/api/trans/vip/translate?appid=%s&q=%s&from=%s&to=%s&salt=%s&sign=%s"%(appId,urllib.quote(strin),fromLang,toLang,str(salt),sign)
request = urllib2.Request(url=url)
reconnect = 0
while reconnect < 8:
try:
response = urllib2.urlopen(request,timeout = 3)
except:
reconnect = reconnect + 1
print "retry",reconnect
else:
break
jsondata = response.read()
decodejson = json.loads(jsondata)
strout = decodejson['trans_result'][0]['dst']
return strout
def youdaofanyi(strin):
appId = '*???*'
appKey = '*???*'
fromLang = 'ja'
toLang = 'zh-CHS'
salt = random.randint(32768, 65536)
sign = appId+strin+str(salt)+appKey
m1 = md5.new()
m1.update(sign)
sign = m1.hexdigest()
url = 'http://openapi.youdao.com/api?q=%s&from=%s&to=%s&appKey=%s&salt=%s&sign=%s'%(urllib.quote(strin),fromLang,toLang,appId,str(salt),sign)
request = urllib2.Request(url=url)
reconnect = 0
while reconnect < 8:
try:
response = urllib2.urlopen(request,timeout = 3)
except:
reconnect = reconnect + 1
print "retry",reconnect
else:
break
jsondata = response.read()
decodejson = json.loads(jsondata)
strout = decodejson["translation"][0]
return strout
def ExistsInCache(cacheLocation, url):
hash = md5.new(url).hexdigest()
return (os.path.exists(cacheLocation + "/" + hash + ".headers") and
os.path.exists(cacheLocation + "/" + hash + ".body"))
def StoreInCache(cacheLocation, url, response):
hash = md5.new(url).hexdigest()
f = open(cacheLocation + "/" + hash + ".headers", "w")
headers = str(response.info())
f.write(headers)
f.close()
f = open(cacheLocation + "/" + hash + ".body", "w")
f.write(response.read())
f.close()
def __init__(self, cacheLocation,url,setCacheHeader=True):
self.cacheLocation = cacheLocation
hash = md5.new(url).hexdigest()
StringIO.StringIO.__init__(self, file(self.cacheLocation + "/" + hash+".body").read())
self.url = url
self.code = 200
self.msg = "OK"
headerbuf = file(self.cacheLocation + "/" + hash+".headers").read()
if setCacheHeader:
headerbuf += "x-cache: %s/%s\r\n" % (self.cacheLocation,hash)
self.headers = httplib.HTTPMessage(StringIO.StringIO(headerbuf))
def verify(self):
"Compute the MD5 checksum for this log to see if the logfiles have been corrupted."
# If there is no self.md5, no checksum exists for this log yet...
if ( not hasattr(self,'md5') ):
print 'WARNING: No MD5 checksum was found for log',self.name
print 'WARNING: Log',self.name,'may be newly created, or it may be corrupt!'
return
# Otherwise, create the MD5 checksum from the log file and metadata file for verification
import md5
checksum = md5.new()
mfn = open(self.mdf, 'r')
for line in mfn.readlines():
checksum.update(line)
mfn.close()
lfn = open(self.ldf, 'r')
for line in lfn.readlines():
checksum.update(line)
lfn.close()
cs = checksum.hexdigest()
if ( self.debug == 'on' ):
print 'DEBUG: The MD5 digest of the metadata and log files is',cs
if ( self.md5 == cs ):
if ( self.debug == 'on' ):
print 'DEBUG: The calculated MD5 checksum',cs,'matches the stored MD5 checksum',self.md5
else:
if ( self.debug == 'on' ):
print 'DEBUG: The calculated MD5 checksum',cs,'does not match the stored MD5 checksum',self.md5
print 'ERROR: The MD5 checksum for log',self.name,'is inconsistent!'
print 'ERROR: Log',self.name,'may be corrupt!'
def passwordToKey(password,bits=64):
assert 64<=bits
assert bits%4==0
length=bits//4
pswd=md5.new(password).hexdigest()
p,q=passwordToPrimePair(pswd,bits)
n=p*q
append="0"*(length-len(pswd))
possible=int(pswd+append,16)|1 # n is always even
# so possible must be odd
while not gcd(possible,n):
possible+=2 # keep it odd
private=possible
public=modInverse(private,totient(p,q))
return public,private,n
def md5(src):
m1 = imd5.new()
m1.update(src)
dest1 = m1.hexdigest()
return dest1
def md5file(fobj):
m = imd5.new()
while True:
d = fobj.read(8096)
if not d:
break
m.update(d)
fobj.seek(0)
return m.hexdigest()
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried, for
example Digest may return stale=true.
"""
return False
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
if self.challenge['algorithm'] == 'HMAC-MD5':
self.hashmod = _md5
else:
self.hashmod = _sha
if self.challenge['pw-algorithm'] == 'MD5':
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
self.key = "".join([self.credentials[0], ":",
self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
":", self.challenge['realm']])
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
os.makedirs(self.cache)
def get_md5():
if sys.version_info >= (2, 6):
import hashlib
hash = hashlib.md5()
else:
import md5
hash = md5.new()
return hash
#LOG_LEVEL can be one of DEBUG INFO ERROR CRITICAL WARNNING
def get_assign(secret_access_key, method, headers=None, resource="/", result=None, debug=DEBUG):
'''
Create the authorization for OSS based on header input.
You should put it into "Authorization" parameter of header.
'''
if not headers:
headers = {}
if not result:
result = []
content_md5 = ""
content_type = ""
date = ""
canonicalized_oss_headers = ""
secret_access_key = convert_utf8(secret_access_key)
global OSS_LOGGER_SET
if not OSS_LOGGER_SET:
OSS_LOGGER_SET = Logger(debug, "log.txt", LOG_LEVEL, "oss_util").getlogger()
OSS_LOGGER_SET.debug("secret_access_key: %s" % secret_access_key)
content_md5 = safe_get_element('Content-MD5', headers)
content_type = safe_get_element('Content-Type', headers)
date = safe_get_element('Date', headers)
canonicalized_resource = resource
tmp_headers = _format_header(headers)
if len(tmp_headers) > 0:
x_header_list = tmp_headers.keys()
x_header_list.sort()
for k in x_header_list:
if k.startswith(SELF_DEFINE_HEADER_PREFIX):
canonicalized_oss_headers += "%s:%s\n" % (k, tmp_headers[k])
string_to_sign = method + "\n" + content_md5.strip() + "\n" + content_type + "\n" + date + "\n" + canonicalized_oss_headers + canonicalized_resource
result.append(string_to_sign)
OSS_LOGGER_SET.debug("method:%s\n content_md5:%s\n content_type:%s\n data:%s\n canonicalized_oss_headers:%s\n canonicalized_resource:%s\n" % (method, content_md5, content_type, date, canonicalized_oss_headers, canonicalized_resource))
OSS_LOGGER_SET.debug("string_to_sign:%s\n \nlength of string_to_sign:%d\n" % (string_to_sign, len(string_to_sign)))
h = hmac.new(secret_access_key, string_to_sign, sha)
sign_result = base64.encodestring(h.digest()).strip()
OSS_LOGGER_SET.debug("sign result:%s" % sign_result)
return sign_result