def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=0, use_datetime=0, context=None):
# establish a "logical" server connection
if isinstance(uri, unicode):
uri = uri.encode('ISO-8859-1')
# get the url
import urllib
type, uri = urllib.splittype(uri)
if type not in ("http", "https"):
raise IOError, "unsupported XML-RPC protocol"
self.__host, self.__handler = urllib.splithost(uri)
if not self.__handler:
self.__handler = "/RPC2"
if transport is None:
if type == "https":
transport = SafeTransport(use_datetime=use_datetime, context=context)
else:
transport = Transport(use_datetime=use_datetime)
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
self.__allow_none = allow_none
python类encode()的实例源码
def wx_signature(data):
string = '&'.join(['%s=%s' % (key.lower(), data[key]) for key in sorted(data) if data[key] is not None and data[key] is not ''])
return hashlib.sha1(string.encode('utf-8')).hexdigest()
def wx_sign_for_pay(params):
content = '&'.join(['%s=%s' % (key, params[key]) for key in sorted(params) if params[key] is not None and params[key] is not ''])
content += '&key=' + settings.WEIXIN_KEY
return hashlib.md5(content.encode('utf-8')).hexdigest().upper()
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string
def encode(self, out):
out.write("<value><boolean>%d</boolean></value>\n" % self.value)
def __init__(
self,
payload,
filename=None,
content_id=None,
content_type=None,
encoding='utf-8'):
if isinstance(payload, str):
if filename is None:
filename = os.path.basename(payload)
payload = read_file(payload, 'rb')
else:
if filename is None:
raise Exception('Missing attachment name')
payload = payload.read()
filename = filename.encode(encoding)
if content_type is None:
content_type = contenttype(filename)
self.my_filename = filename
self.my_payload = payload
MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
self.set_payload(payload)
self['Content-Disposition'] = 'attachment; filename="%s"' % filename
if content_id is not None:
self['Content-Id'] = '<%s>' % content_id.encode(encoding)
Encoders.encode_base64(self)
def jwt_b64e(string):
if isinstance(string, unicode):
string = string.encode('utf-8', 'strict')
return base64.urlsafe_b64encode(string).strip(b'=')
def jwt_b64d(string):
"""base64 decodes a single bytestring (and is tolerant to getting
called with a unicode string).
The result is also a bytestring.
"""
if isinstance(string, unicode):
string = string.encode('ascii', 'ignore')
return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
def generate_token(self, payload):
secret = self.secret_key
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(payload))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
b64h = self.cached_b64h
b64p = self.jwt_b64e(serializers.json(payload))
jbody = b64h + '.' + b64p
mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
jsign = self.jwt_b64e(mauth.digest())
return jbody + '.' + jsign
def load_token(self, token):
if isinstance(token, unicode):
token = token.encode('utf-8', 'strict')
body, sig = token.rsplit('.', 1)
b64h, b64b = body.split('.', 1)
if b64h != self.cached_b64h:
# header not the same
raise HTTP(400, u'Invalid JWT Header')
secret = self.secret_key
tokend = serializers.loads_json(self.jwt_b64d(b64b))
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(tokend))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
if not self.verify_signature(body, sig, secret):
# signature verification failed
raise HTTP(400, u'Token signature is invalid')
if self.verify_expiration:
now = time.mktime(datetime.datetime.utcnow().timetuple())
if tokend['exp'] + self.leeway < now:
raise HTTP(400, u'Token is expired')
if callable(self.before_authorization):
self.before_authorization(tokend)
return tokend
def getHash(self, string):
'''return a unique and stable hash value from hashing the website url'''
m = hashlib.sha1(string.encode('utf-8'))
return m.hexdigest()
def getHash(url):
'''return a unique and stable hash value from hashing the website url'''
m = hashlib.sha1(url.encode('utf-8'))
return m.hexdigest()
def sign(self):
string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in
sorted(self.ret)])
self.ret['signature'] = hashlib.sha1(string.encode('utf-8')).hexdigest()
return self.ret
def __init__(
self,
payload,
filename=None,
content_id=None,
content_type=None,
encoding='utf-8'):
if isinstance(payload, str):
if filename is None:
filename = os.path.basename(payload)
payload = read_file(payload, 'rb')
else:
if filename is None:
raise Exception('Missing attachment name')
payload = payload.read()
filename = filename.encode(encoding)
if content_type is None:
content_type = contenttype(filename)
self.my_filename = filename
self.my_payload = payload
MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
self.set_payload(payload)
self['Content-Disposition'] = 'attachment; filename="%s"' % filename
if content_id is not None:
self['Content-Id'] = '<%s>' % content_id.encode(encoding)
Encoders.encode_base64(self)
def jwt_b64e(string):
if isinstance(string, unicode):
string = string.encode('utf-8', 'strict')
return base64.urlsafe_b64encode(string).strip(b'=')
def jwt_b64d(string):
"""base64 decodes a single bytestring (and is tolerant to getting
called with a unicode string).
The result is also a bytestring.
"""
if isinstance(string, unicode):
string = string.encode('ascii', 'ignore')
return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
def generate_token(self, payload):
secret = self.secret_key
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(payload))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
b64h = self.cached_b64h
b64p = self.jwt_b64e(serializers.json(payload))
jbody = b64h + '.' + b64p
mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
jsign = self.jwt_b64e(mauth.digest())
return jbody + '.' + jsign
def load_token(self, token):
if isinstance(token, unicode):
token = token.encode('utf-8', 'strict')
body, sig = token.rsplit('.', 1)
b64h, b64b = body.split('.', 1)
if b64h != self.cached_b64h:
# header not the same
raise HTTP(400, u'Invalid JWT Header')
secret = self.secret_key
tokend = serializers.loads_json(self.jwt_b64d(b64b))
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(tokend))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
if not self.verify_signature(body, sig, secret):
# signature verification failed
raise HTTP(400, u'Token signature is invalid')
if self.verify_expiration:
now = time.mktime(datetime.datetime.utcnow().timetuple())
if tokend['exp'] + self.leeway < now:
raise HTTP(400, u'Token is expired')
if callable(self.before_authorization):
self.before_authorization(tokend)
return tokend
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string
def encode(self, out):
out.write("<value><boolean>%d</boolean></value>\n" % self.value)