def do_POST(self):
self.logger.debug('Webhook triggered')
try:
self._validate_post()
clen = self._get_content_len()
except _InvalidPost as e:
self.send_error(e.http_code)
self.end_headers()
else:
buf = self.rfile.read(clen)
json_string = bytes_to_native_str(buf)
self.send_response(200)
self.end_headers()
self.logger.debug('Webhook received data: ' + json_string)
update = Update.de_json(json.loads(json_string), self.server.bot)
self.logger.debug('Received Update with ID %d on Webhook' % update.update_id)
self.server.update_queue.put(update)
python类bytes_to_native_str()的实例源码
def do_POST(self):
self.logger.debug('Webhook triggered')
try:
self._validate_post()
clen = self._get_content_len()
except _InvalidPost as e:
self.send_error(e.http_code)
self.end_headers()
else:
buf = self.rfile.read(clen)
json_string = bytes_to_native_str(buf)
self.send_response(200)
self.end_headers()
self.logger.debug('Webhook received data: ' + json_string)
update = Update.de_json(json.loads(json_string), self.server.bot)
self.logger.debug('Received Update with ID %d on Webhook' % update.update_id)
self.server.update_queue.put(update)
def hash_password(password, encoding=HashEncoding.BINARY, salt_size=SALT_SIZE):
"""
Returns a hashed password.
"""
salt = urandom(salt_size)
hasher = hashlib.new(config.get('security.hash_algorithm') or 'sha256')
if isinstance(password, text_type):
password = password.encode('utf-8')
hasher.update(password)
hasher.update(salt)
if encoding == HashEncoding.BINARY:
return salt + hasher.digest()
elif encoding == HashEncoding.HEX:
return bytes_to_native_str(hexlify(salt)) + hasher.hexdigest()
elif encoding == HashEncoding.BASE64:
return bytes_to_native_str(
urlsafe_b64encode(salt) + urlsafe_b64encode(hasher.digest()))
raise ValueError()
def n(b):
def e(cls):
warnings.warn("telegram.Emoji is being deprecated, please see https://git.io/v6DeB")
return bytes_to_native_str(b)
return property(e)
def test_str_encode_decode_with_py2_str_arg(self):
# Try passing a standard Py2 string (as if unicode_literals weren't imported)
b = str(TEST_UNICODE_STR).encode(utils.bytes_to_native_str(b'utf-8'))
self.assertTrue(isinstance(b, bytes))
self.assertFalse(isinstance(b, str))
s = b.decode(utils.bytes_to_native_str(b'utf-8'))
self.assertTrue(isinstance(s, str))
self.assertEqual(s, TEST_UNICODE_STR)
def bytes_to_native_str(b, encoding=None):
return native(b)
def login_for_jwt(self):
try:
session = requests.Session()
session.headers.update({'content-type':'application/json', 'accept': 'application/json'})
password_prompt = bytes_to_native_str(b"Password :")
if self.id_provider == 'udacity':
print("Udacity Login required.")
email = input('Email :')
password = getpass.getpass(password_prompt)
udacity_login(session, self.root_url, email, password)
elif self.id_provider == 'gt':
print("GT Login required.")
username = input('Username :')
password = getpass.getpass(password_prompt)
gt_login(session, self.root_url, username, password)
elif self.id_provider == 'developer':
print("Developer Login required.")
username = input('Username :')
developer_login(session, self.root_url, username)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
raise NelsonAuthenticationError("Authentication failed")
else:
raise e
r = session.post(self.root_url + '/auth_tokens')
r.raise_for_status()
jwt = r.json()['auth_token']
return jwt
#Helper functions for logins
def cleanup_x509_text(txt):
kts = txt.split(b'\n')
kt = [b' ' + x for x in kts if len(x) and
not (x.startswith(b'----') and x.endswith(b'----'))]
return bytes_to_native_str(b' ' + b'\n '.join(kt) + b'\n')
def populate_random(random_file, random_templates=None, saml_info=None):
"""Populate random.ini
Create missing random values according to the template
Do not change existing values"""
from base64 import b64encode
from os import urandom
from assembl.auth.make_saml import (
make_saml_key, make_saml_cert, cleanup_x509_text)
base = ConfigParser(interpolation=None)
assert random_templates, "Please give one or more templates"
for template in random_templates:
assert exists(template), "Cannot find template " + template
base.read(template)
existing = ConfigParser(interpolation=None)
if exists(random_file):
existing.read(random_file)
combine_ini(base, existing)
saml_keys = {}
changed = False
for section in base.sections():
for key, value in base.items(section):
keyu = key.upper()
# too much knowdledge, but hard to avoid
if "SAML" in keyu and keyu.endswith("_PRIVATE_KEY"):
prefix = keyu[:-12]
if value == "{saml_key}":
saml_key_text, saml_key = make_saml_key()
saml_key_text = cleanup_x509_text(saml_key_text)
base.set(section, key, saml_key_text)
saml_keys[prefix] = saml_key
changed = True
else:
saml_keys[prefix] = value
elif value.startswith('{random') and value.endswith("}"):
size = int(value[7:-1])
assert 0 < size < 100
value = bytes_to_native_str(b64encode(urandom(size)))
base.set(section, key, value)
changed = True
# Do certs in second pass, to be sure keys are set
for section in base.sections():
for key, value in base.items(section):
keyu = key.upper()
if ("SAML" in keyu and keyu.endswith("_PUBLIC_CERT") and
value == '{saml_cert}'):
assert saml_info
prefix = keyu[:-12]
# If key is not there, it IS a mismatch and and error.
saml_key = saml_keys[prefix]
saml_cert_text, _ = make_saml_cert(saml_key, **saml_info)
saml_cert_text = cleanup_x509_text(saml_cert_text)
base.set(section, key, saml_cert_text)
changed = True
if changed:
with open(random_file, 'w') as f:
base.write(f)
return base