python类new()的实例源码

__init__.py 文件源码 项目:wiobot 作者: idreamsi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def request(self, method, request_uri, headers, content):
        """Modify the request headers"""
        keys = _get_end2end_headers(headers)
        keylist = "".join(["%s " % k for k in keys])
        headers_val = "".join([headers[k] for k in keys])
        created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
        cnonce = _cnonce()
        request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
        request_digest  = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
        headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
                self.credentials[0],
                self.challenge['realm'],
                self.challenge['snonce'],
                cnonce,
                request_uri,
                created,
                request_digest,
                keylist)
__init__.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def request(self, method, request_uri, headers, content):
        """Modify the request headers"""
        keys = _get_end2end_headers(headers)
        keylist = "".join(["%s " % k for k in keys])
        headers_val = "".join([headers[k] for k in keys])
        created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
        cnonce = _cnonce()
        request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
        request_digest  = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
        headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
                self.credentials[0],
                self.challenge['realm'],
                self.challenge['snonce'],
                cnonce,
                request_uri,
                created,
                request_digest,
                keylist)
exp?1817.py 文件源码 项目:poc 作者: Chinalover 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def audit(arg):
    md5_list = [
        '3a1c6cc728dddc258091a601f28a9c12',
        '53fef78841c3fae1ee992ae324a51620',
        '4c2fc69dc91c885837ce55d03493a5f5',        
    ]
    code, head, res, err, _ = curl.curl2(arg)
    if code == 200:
        md5_value = md5.new(res).hexdigest()
        if md5_value in md5_list:
            security_warning(arg + '?movieName=%22]%29}catch%28e%29{if%28!window.x%29{window.x=1;alert%28document.cookie%29}}// flash xss')
        else:
            #debug(arg + ' **_**' + md5_value)
            pass
    else:
        #debug(arg + '**__**not found')
        pass
__init__.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def response(self, response, content):
        """Gives us a chance to update with new nonces
        or such returned from the last authorized response.
        Over-rise this in sub-classes if necessary.

        Return TRUE is the request is to be retried, for
        example Digest may return stale=true.
        """
        return False
__init__.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        self.challenge = challenge['hmacdigest']
        # TODO: self.challenge['domain']
        self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
        if self.challenge['reason'] not in ['unauthorized', 'integrity']:
            self.challenge['reason'] = 'unauthorized'
        self.challenge['salt'] = self.challenge.get('salt', '')
        if not self.challenge.get('snonce'):
            raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
        self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
        if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
            raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
        self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
        if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
            raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
        if self.challenge['algorithm'] == 'HMAC-MD5':
            self.hashmod = _md5
        else:
            self.hashmod = _sha
        if self.challenge['pw-algorithm'] == 'MD5':
            self.pwhashmod = _md5
        else:
            self.pwhashmod = _sha
        self.key = "".join([self.credentials[0], ":",
                            self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
                            ":", self.challenge['realm']])
        self.key = self.pwhashmod.new(self.key).hexdigest().lower()
__init__.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
        self.cache = cache
        self.safe = safe
        if not os.path.exists(cache):
            os.makedirs(self.cache)
tools.py 文件源码 项目:idol 作者: nondanee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def baidufanyi(strin):

    appId = '*???*' 
    secretKey = '*???*'
    fromLang = 'jp'
    toLang = 'zh'

    salt = random.randint(32768, 65536)
    sign = appId+strin+str(salt)+secretKey
    m1 = md5.new()
    m1.update(sign)
    sign = m1.hexdigest()

    url = "http://api.fanyi.baidu.com/api/trans/vip/translate?appid=%s&q=%s&from=%s&to=%s&salt=%s&sign=%s"%(appId,urllib.quote(strin),fromLang,toLang,str(salt),sign)

    request = urllib2.Request(url=url)

    reconnect = 0
    while reconnect < 8:
        try:
            response = urllib2.urlopen(request,timeout = 3)
        except:
            reconnect = reconnect + 1
            print "retry",reconnect
        else:
            break
    jsondata = response.read()

    decodejson = json.loads(jsondata)
    strout = decodejson['trans_result'][0]['dst']   
    return strout
tools.py 文件源码 项目:idol 作者: nondanee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def youdaofanyi(strin):

    appId = '*???*'
    appKey = '*???*'
    fromLang = 'ja'
    toLang = 'zh-CHS'


    salt = random.randint(32768, 65536)
    sign = appId+strin+str(salt)+appKey
    m1 = md5.new()
    m1.update(sign)
    sign = m1.hexdigest()

    url = 'http://openapi.youdao.com/api?q=%s&from=%s&to=%s&appKey=%s&salt=%s&sign=%s'%(urllib.quote(strin),fromLang,toLang,appId,str(salt),sign)

    request = urllib2.Request(url=url)

    reconnect = 0
    while reconnect < 8:
        try:
            response = urllib2.urlopen(request,timeout = 3)
        except:
            reconnect = reconnect + 1
            print "retry",reconnect
        else:
            break
    jsondata = response.read()

    decodejson = json.loads(jsondata)
    strout = decodejson["translation"][0]
    return strout
recipe-491261.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ExistsInCache(cacheLocation, url):
        hash = md5.new(url).hexdigest()
        return (os.path.exists(cacheLocation + "/" + hash + ".headers") and 
                os.path.exists(cacheLocation + "/" + hash + ".body"))
recipe-491261.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def StoreInCache(cacheLocation, url, response):
        hash = md5.new(url).hexdigest()
        f = open(cacheLocation + "/" + hash + ".headers", "w")
        headers = str(response.info())
        f.write(headers)
        f.close()
        f = open(cacheLocation + "/" + hash + ".body", "w")
        f.write(response.read())
        f.close()
recipe-491261.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, cacheLocation,url,setCacheHeader=True):
        self.cacheLocation = cacheLocation
        hash = md5.new(url).hexdigest()
        StringIO.StringIO.__init__(self, file(self.cacheLocation + "/" + hash+".body").read())
        self.url     = url
        self.code    = 200
        self.msg     = "OK"
        headerbuf = file(self.cacheLocation + "/" + hash+".headers").read()
        if setCacheHeader:
            headerbuf += "x-cache: %s/%s\r\n" % (self.cacheLocation,hash)
        self.headers = httplib.HTTPMessage(StringIO.StringIO(headerbuf))
recipe-66452.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def verify(self):
        "Compute the MD5 checksum for this log to see if the logfiles have been corrupted."
        # If there is no self.md5, no checksum exists for this log yet...
        if ( not hasattr(self,'md5') ):
            print 'WARNING: No MD5 checksum was found for log',self.name
            print 'WARNING: Log',self.name,'may be newly created, or it may be corrupt!'
            return

        # Otherwise, create the MD5 checksum from the log file and metadata file for verification
        import md5
        checksum = md5.new()
        mfn = open(self.mdf, 'r')
        for line in mfn.readlines():
            checksum.update(line)
        mfn.close()
        lfn = open(self.ldf, 'r')
        for line in lfn.readlines():
            checksum.update(line)
        lfn.close()
        cs = checksum.hexdigest()
        if ( self.debug == 'on' ):
            print 'DEBUG: The MD5 digest of the metadata and log files is',cs   
        if ( self.md5 == cs ):
            if ( self.debug == 'on' ):
                print 'DEBUG: The calculated MD5 checksum',cs,'matches the stored MD5 checksum',self.md5
        else:
            if ( self.debug == 'on' ):
                print 'DEBUG: The calculated MD5 checksum',cs,'does not match the stored MD5 checksum',self.md5                
            print 'ERROR: The MD5 checksum for log',self.name,'is inconsistent!'
            print 'ERROR: Log',self.name,'may be corrupt!'
recipe-572196.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def passwordToKey(password,bits=64):
    assert 64<=bits
    assert bits%4==0
    length=bits//4
    pswd=md5.new(password).hexdigest()
    p,q=passwordToPrimePair(pswd,bits)
    n=p*q
    append="0"*(length-len(pswd))
    possible=int(pswd+append,16)|1 # n is always even
                            # so possible must be odd
    while not gcd(possible,n):
        possible+=2 # keep it odd
    private=possible
    public=modInverse(private,totient(p,q))
    return public,private,n
upyun.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def md5(src):
    m1 = imd5.new()   
    m1.update(src)   
    dest1 = m1.hexdigest() 
    return dest1
upyun.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def md5file(fobj):
    m = imd5.new()
    while True:
        d = fobj.read(8096)
        if not d:
            break
        m.update(d)
    fobj.seek(0)
    return m.hexdigest()
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def response(self, response, content):
        """Gives us a chance to update with new nonces
        or such returned from the last authorized response.
        Over-rise this in sub-classes if necessary.

        Return TRUE is the request is to be retried, for
        example Digest may return stale=true.
        """
        return False
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        self.challenge = challenge['hmacdigest']
        # TODO: self.challenge['domain']
        self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
        if self.challenge['reason'] not in ['unauthorized', 'integrity']:
            self.challenge['reason'] = 'unauthorized'
        self.challenge['salt'] = self.challenge.get('salt', '')
        if not self.challenge.get('snonce'):
            raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
        self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
        if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
            raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
        self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
        if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
            raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
        if self.challenge['algorithm'] == 'HMAC-MD5':
            self.hashmod = _md5
        else:
            self.hashmod = _sha
        if self.challenge['pw-algorithm'] == 'MD5':
            self.pwhashmod = _md5
        else:
            self.pwhashmod = _sha
        self.key = "".join([self.credentials[0], ":",
                            self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
                            ":", self.challenge['realm']])
        self.key = self.pwhashmod.new(self.key).hexdigest().lower()
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
        self.cache = cache
        self.safe = safe
        if not os.path.exists(cache):
            os.makedirs(self.cache)
oss_util.py 文件源码 项目:mongodb_backup_script 作者: hxt168 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_md5():
    if sys.version_info >= (2, 6):
        import hashlib
        hash = hashlib.md5()
    else:
        import md5
        hash = md5.new()
    return hash

#LOG_LEVEL can be one of DEBUG INFO ERROR CRITICAL WARNNING
oss_util.py 文件源码 项目:mongodb_backup_script 作者: hxt168 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_assign(secret_access_key, method, headers=None, resource="/", result=None, debug=DEBUG):
    '''
    Create the authorization for OSS based on header input.
    You should put it into "Authorization" parameter of header.
    '''
    if not headers:
        headers = {}
    if not result:
        result = []
    content_md5 = ""
    content_type = ""
    date = ""
    canonicalized_oss_headers = ""
    secret_access_key = convert_utf8(secret_access_key)
    global OSS_LOGGER_SET 
    if not OSS_LOGGER_SET:
        OSS_LOGGER_SET = Logger(debug, "log.txt", LOG_LEVEL, "oss_util").getlogger()
    OSS_LOGGER_SET.debug("secret_access_key: %s" % secret_access_key)
    content_md5 = safe_get_element('Content-MD5', headers)
    content_type = safe_get_element('Content-Type', headers)
    date = safe_get_element('Date', headers)
    canonicalized_resource = resource
    tmp_headers = _format_header(headers)
    if len(tmp_headers) > 0:
        x_header_list = tmp_headers.keys()
        x_header_list.sort()
        for k in x_header_list:
            if k.startswith(SELF_DEFINE_HEADER_PREFIX):
                canonicalized_oss_headers += "%s:%s\n" % (k, tmp_headers[k]) 
    string_to_sign = method + "\n" + content_md5.strip() + "\n" + content_type + "\n" + date + "\n" + canonicalized_oss_headers + canonicalized_resource
    result.append(string_to_sign)
    OSS_LOGGER_SET.debug("method:%s\n content_md5:%s\n content_type:%s\n data:%s\n canonicalized_oss_headers:%s\n canonicalized_resource:%s\n" % (method, content_md5, content_type, date, canonicalized_oss_headers, canonicalized_resource))
    OSS_LOGGER_SET.debug("string_to_sign:%s\n \nlength of string_to_sign:%d\n" % (string_to_sign, len(string_to_sign)))
    h = hmac.new(secret_access_key, string_to_sign, sha)
    sign_result = base64.encodestring(h.digest()).strip()
    OSS_LOGGER_SET.debug("sign result:%s" % sign_result)
    return sign_result


问题


面经


文章

微信
公众号

扫码关注公众号