def produce(obj, pb, sep):
for ds, val in pb.ListFields():
for val in (val if ds.label == ds.LABEL_REPEATED else [val]):
if ds.cpp_type == ds.CPPTYPE_MESSAGE:
origlen = len(obj)
produce(obj, val, sep)
obj.insert(origlen, '%dm%d' % (ds.number, len(obj) - origlen))
continue
elif ds.type == ds.TYPE_STRING:
if sep == '!':
val = val.replace('*', '*2A').replace('!', '*21')
else:
val = quote(val, safe='~()*!.\'')
elif ds.type == ds.TYPE_BYTES:
val = urlsafe_b64encode(val).decode('ascii').strip('=')
elif ds.type == ds.TYPE_BOOL:
val = int(val)
obj.append('%d%s%s' % (ds.number, types_enc[ds.type], val))
return obj
python类urlsafe_b64encode()的实例源码
def generate_token(key, user_id, action_id='', when=None):
"""Generates a URL-safe token for the given user, action, time tuple.
Args:
key: secret key to use.
user_id: the user ID of the authenticated user.
action_id: a string identifier of the action they requested
authorization for.
when: the time in seconds since the epoch at which the user was
authorized for this action. If not set the current time is used.
Returns:
A string XSRF protection token.
"""
digester = hmac.new(_to_bytes(key, encoding='utf-8'))
digester.update(_to_bytes(str(user_id), encoding='utf-8'))
digester.update(DELIMITER)
digester.update(_to_bytes(action_id, encoding='utf-8'))
digester.update(DELIMITER)
when = _to_bytes(str(when or int(time.time())), encoding='utf-8')
digester.update(when)
digest = digester.digest()
token = base64.urlsafe_b64encode(digest + DELIMITER + when)
return token
def ssrlink(self, user, encode, muid):
protocol = user.get('protocol', '')
obfs = user.get('obfs', '')
protocol = protocol.replace("_compatible", "")
obfs = obfs.replace("_compatible", "")
protocol_param = ''
if muid is not None:
protocol_param_ = user.get('protocol_param', '')
param = protocol_param_.split('#')
if len(param) == 2:
for row in self.data.json:
if int(row['port']) == muid:
param = str(muid) + ':' + row['passwd']
protocol_param = '/?protoparam=' + common.to_str(base64.urlsafe_b64encode(common.to_bytes(param))).replace("=", "")
break
link = ("%s:%s:%s:%s:%s:%s" % (self.server_addr, user['port'], protocol, user['method'], obfs, common.to_str(base64.urlsafe_b64encode(common.to_bytes(user['passwd']))).replace("=", ""))) + protocol_param
return "ssr://" + (encode and common.to_str(base64.urlsafe_b64encode(common.to_bytes(link))).replace("=", "") or link)
def CreateMessage(sender, to, subject, message_text):
"""Create a message for an email.
Args:
sender: Email address of the sender.
to: Email address of the receiver.
subject: The subject of the email message.
message_text: The text of the email message.
Returns:
An object containing a base64url encoded email object.
"""
message = MIMEText(message_text)
message['to'] = to
message['from'] = sender
message['subject'] = subject
return {'raw': base64.urlsafe_b64encode(message.as_string())}
def generate_token_string(self, action=None):
"""Generate a hash of the given token contents that can be verified.
:param action:
A string representing the action that the generated hash is valid
for. This string is usually a URL.
:returns:
A string containing the hash contents of the given `action` and the
contents of the `XSRFToken`. Can be verified with
`verify_token_string`. The string is base64 encoded so it is safe
to use in HTML forms without escaping.
"""
digest_maker = self._digest_maker()
digest_maker.update(self.user_id)
digest_maker.update(self._DELIMITER)
if action:
digest_maker.update(action)
digest_maker.update(self._DELIMITER)
digest_maker.update(str(self.current_time))
return base64.urlsafe_b64encode(
self._DELIMITER.join([digest_maker.hexdigest(),
str(self.current_time)]))
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def create_datakey(encryption_context, keyid, client=None):
'''
Create a datakey from KMS.
'''
if not client:
client = confidant_client.services.get_boto_client('kms')
# Fernet key; from spec and cryptography implementation, but using
# random from KMS, rather than os.urandom:
# https://github.com/fernet/spec/blob/master/Spec.md#key-format
# https://cryptography.io/en/latest/_modules/cryptography/fernet/#Fernet.generate_key
key = base64.urlsafe_b64encode(
client.generate_random(NumberOfBytes=32)['Plaintext']
)
response = client.encrypt(
KeyId='{0}'.format(keyid),
Plaintext=key,
EncryptionContext=encryption_context
)
return {'ciphertext': response['CiphertextBlob'],
'plaintext': key}
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0):
from email.mime.text import MIMEText
userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email'))
if not gmail:
return
msg = MIMEText(msg_txt)
msg[u'Subject'] = msg_subj
msg[u'From'] = userId
msg[u'To'] = msg_rcpt
action = Act.Get()
Act.Set(Act.SENDEMAIL)
try:
callGAPI(gmail.users().messages(), u'send',
userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'')
entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count)
except googleapiclient.errors.HttpError as e:
entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count)
Act.Set(action)
# Write a CSV file
def code_verifier(n_bytes=64):
"""
Generates a 'code_verifier' as described in section 4.1 of RFC 7636.
This is a 'high-entropy cryptographic random string' that will be
impractical for an attacker to guess.
Args:
n_bytes: integer between 31 and 96, inclusive. default: 64
number of bytes of entropy to include in verifier.
Returns:
Bytestring, representing urlsafe base64-encoded random data.
"""
verifier = base64.urlsafe_b64encode(os.urandom(n_bytes)).rstrip(b'=')
# https://tools.ietf.org/html/rfc7636#section-4.1
# minimum length of 43 characters and a maximum length of 128 characters.
if len(verifier) < 43:
raise ValueError("Verifier too short. n_bytes must be > 30.")
elif len(verifier) > 128:
raise ValueError("Verifier too long. n_bytes must be < 97.")
else:
return verifier
def code_challenge(verifier):
"""
Creates a 'code_challenge' as described in section 4.2 of RFC 7636
by taking the sha256 hash of the verifier and then urlsafe
base64-encoding it.
Args:
verifier: bytestring, representing a code_verifier as generated by
code_verifier().
Returns:
Bytestring, representing a urlsafe base64-encoded sha256 hash digest,
without '=' padding.
"""
digest = hashlib.sha256(verifier).digest()
return base64.urlsafe_b64encode(digest).rstrip(b'=')
def generate_token(key, user_id, action_id='', when=None):
"""Generates a URL-safe token for the given user, action, time tuple.
Args:
key: secret key to use.
user_id: the user ID of the authenticated user.
action_id: a string identifier of the action they requested
authorization for.
when: the time in seconds since the epoch at which the user was
authorized for this action. If not set the current time is used.
Returns:
A string XSRF protection token.
"""
digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8'))
digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8'))
digester.update(DELIMITER)
digester.update(_helpers._to_bytes(action_id, encoding='utf-8'))
digester.update(DELIMITER)
when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8')
digester.update(when)
digest = digester.digest()
token = base64.urlsafe_b64encode(digest + DELIMITER + when)
return token
def client_encode(self, buf):
if self.raw_trans_sent:
return buf
self.send_buffer += buf
if not self.has_sent_header:
port = b''
if self.server_info.port != 80:
port = b':' + common.to_bytes(str(self.server_info.port))
self.has_sent_header = True
http_head = b"GET / HTTP/1.1\r\n"
http_head += b"Host: " + (self.server_info.param or self.server_info.host) + port + b"\r\n"
http_head += b"Connection: Upgrade, HTTP2-Settings\r\nUpgrade: h2c\r\n"
http_head += b"HTTP2-Settings: " + base64.urlsafe_b64encode(buf) + b"\r\n"
return http_head + b"\r\n"
if self.has_recv_header:
ret = self.send_buffer
self.send_buffer = b''
self.raw_trans_sent = True
return ret
return b''
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
"""Send an email
"""
subtype = 'html' if html else 'plain'
message = MIMEMultipart()
message['To'] = ', '.join(To)
message['Subject'] = Subject
message['Cc'] = ', '.join(Cc)
message['Bcc'] = ', '.join(Bcc)
message.attach(MIMEText(Body, subtype))
for f in files:
with open(f, "rb") as In:
part = MIMEApplication(In.read(), Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
message.attach(part)
message = {'raw': base64.urlsafe_b64encode(message.as_string())}
credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
Http = credentials.authorize(httplib2.Http())
service = discovery.build('gmail', 'v1', http=Http)
message = service.users().messages().send(userId='me', body=message).execute()
querystringsafe_base64.py 文件源码
项目:querystringsafe_base64
作者: ClearcodeHQ
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def encode(to_encode):
"""
Encode an arbitrary string as a base64 that is safe to put as a URL query value.
urllib.quote and urllib.quote_plus do not have any effect on the
result of querystringsafe_base64.encode.
:param (str, bytes) to_encode:
:rtype: str
:return: a string that is safe to put as a value in an URL query
string - like base64, except characters ['+', '/', '='] are
replaced with ['-', '_', '.'] consequently
"""
encoded = urlsafe_b64encode(to_encode).replace(b'=', b'.')
if PY2:
return encoded
return encoded.decode()
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def create_message(sender, to, subject, message_text, style='html'):
"""Create a message for an email.
Args:
sender: Email address of the sender.
to: Email address of the receiver.
subject: The subject of the email message.
message_text: The text of the email message.
Returns:
An object containing a base64url encoded email object.
"""
if python_version == 2:
message_text = unicode(message_text).encode('utf-8')
message = MIMEText(message_text, _subtype=style)
message['to'] = to
message['from'] = sender
message['subject'] = subject
message = message.as_string()
if python_version == 2:
message = base64.urlsafe_b64encode(message)
else:
message = base64.urlsafe_b64encode(
message.encode('utf-8')).decode('utf-8')
return {'raw': message}
def make_id():
"""Make uniq short id"""
return urlsafe_b64encode(uuid4().bytes).rstrip(b'=')
def get_hash(self, data, hasher=None):
"""
Get the hash of some data, using a particular hash algorithm, if
specified.
:param data: The data to be hashed.
:type data: bytes
:param hasher: The name of a hash implementation, supported by hashlib,
or ``None``. Examples of valid values are ``'sha1'``,
``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
``'sha512'``. If no hasher is specified, the ``hasher``
attribute of the :class:`InstalledDistribution` instance
is used. If the hasher is determined to be ``None``, MD5
is used as the hashing algorithm.
:returns: The hash of the data. If a hasher was explicitly specified,
the returned hash will be prefixed with the specified hasher
followed by '='.
:rtype: str
"""
if hasher is None:
hasher = self.hasher
if hasher is None:
hasher = hashlib.md5
prefix = ''
else:
hasher = getattr(hashlib, hasher)
prefix = '%s=' % self.hasher
digest = hasher(data).digest()
digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
if hash_kind is None:
hash_kind = self.hash_kind
try:
hasher = getattr(hashlib, hash_kind)
except AttributeError:
raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
result = hasher(data).digest()
result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
"""Return (hash, length) for path using hashlib.new(algo)"""
h = hashlib.new(algo)
length = 0
with open(path, 'rb') as f:
for block in read_chunks(f, size=blocksize):
length += len(block)
h.update(block)
digest = 'sha256=' + urlsafe_b64encode(
h.digest()
).decode('latin1').rstrip('=')
return (digest, length)
def urlsafe_b64encode(data):
"""urlsafe_b64encode without padding"""
return base64.urlsafe_b64encode(data).rstrip(binary('='))
def digest(self):
if self.hashtype == 'md5':
return self.hash.hexdigest()
digest = self.hash.digest()
return self.hashtype + '=' + native(urlsafe_b64encode(digest))
def get_hash(self, data, hasher=None):
"""
Get the hash of some data, using a particular hash algorithm, if
specified.
:param data: The data to be hashed.
:type data: bytes
:param hasher: The name of a hash implementation, supported by hashlib,
or ``None``. Examples of valid values are ``'sha1'``,
``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
``'sha512'``. If no hasher is specified, the ``hasher``
attribute of the :class:`InstalledDistribution` instance
is used. If the hasher is determined to be ``None``, MD5
is used as the hashing algorithm.
:returns: The hash of the data. If a hasher was explicitly specified,
the returned hash will be prefixed with the specified hasher
followed by '='.
:rtype: str
"""
if hasher is None:
hasher = self.hasher
if hasher is None:
hasher = hashlib.md5
prefix = ''
else:
hasher = getattr(hashlib, hasher)
prefix = '%s=' % self.hasher
digest = hasher(data).digest()
digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
if hash_kind is None:
hash_kind = self.hash_kind
try:
hasher = getattr(hashlib, hash_kind)
except AttributeError:
raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
result = hasher(data).digest()
result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
"""Return (hash, length) for path using hashlib.new(algo)"""
h = hashlib.new(algo)
length = 0
with open(path, 'rb') as f:
for block in read_chunks(f, size=blocksize):
length += len(block)
h.update(block)
digest = 'sha256=' + urlsafe_b64encode(
h.digest()
).decode('latin1').rstrip('=')
return (digest, length)
def urlsafe_b64encode(data):
"""urlsafe_b64encode without padding"""
return base64.urlsafe_b64encode(data).rstrip(binary('='))
def digest(self):
if self.hashtype == 'md5':
return self.hash.hexdigest()
digest = self.hash.digest()
return self.hashtype + '=' + native(urlsafe_b64encode(digest))
def start(self, url_state=None):
"""
Starts the OAuth 2 authorization process.
This function builds an "authorization URL". You should redirect your
user's browser to this URL, which will give them an opportunity to
grant your app access to their Dropbox account. When the user
completes this process, they will be automatically redirected to the
``redirect_uri`` you passed in to the constructor.
This function will also save a CSRF token to
``session[csrf_token_session_key]`` (as provided to the constructor).
This CSRF token will be checked on :meth:`finish()` to prevent request
forgery.
:param str url_state: Any data that you would like to keep in the URL
through the authorization process. This exact value will be
returned to you by :meth:`finish()`.
:return: The URL for a page on Dropbox's website. This page will let
the user "approve" your app, which gives your app permission to
access the user's Dropbox account. Tell the user to visit this URL
and approve your app.
"""
csrf_token = base64.urlsafe_b64encode(os.urandom(16)).decode('ascii')
state = csrf_token
if url_state is not None:
state += "|" + url_state
self.session[self.csrf_token_session_key] = csrf_token
return self._get_authorize_url(self.redirect_uri, state)
def pack(self, message, ascii_encoded=False):
"""Packs a message for transport as described in y/cep342.
Use :func:`unpack` to decode the packed message.
Args:
message (data_pipeline.message.Message): The message to pack
ascii_decoded (Optional[bool]): Set to True if message is not valid ASCII
Returns:
bytes: Avro byte string prepended by magic envelope version byte
The initial "magic byte" is meant to specify the envelope schema version.
See y/cep342 for details. In other words, the version number of the current
schema is the null byte. In the event we need to add additional envelope
versions, we'll use this byte to identify it.
In addition, the "magic byte" is used as a protocol to encode the serialized
message in base64. See DATAPIPE-1350 for more detail. This option has been
added because as of now, yelp_clog only supports sending valid ASCII strings.
Producer/Consumer registration will make use of this to instead send base64
encoded strings.
"""
msg = bytes(0) + self._avro_string_writer.encode(message.avro_repr)
if ascii_encoded:
return self.ASCII_MAGIC_BYTE + base64.urlsafe_b64encode(msg)
else:
return msg
def _pack_up(credentials):
return base64.urlsafe_b64encode(
bytes(json.dumps(credentials), encoding="UTF-8")
)