python类urlsafe_b64encode()的实例源码

pburl_decoder.py 文件源码 项目:pbtk 作者: marin-m 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def produce(obj, pb, sep):
    for ds, val in pb.ListFields():
        for val in (val if ds.label == ds.LABEL_REPEATED else [val]):

            if ds.cpp_type == ds.CPPTYPE_MESSAGE:
                origlen = len(obj)
                produce(obj, val, sep)
                obj.insert(origlen, '%dm%d' % (ds.number, len(obj) - origlen))
                continue

            elif ds.type == ds.TYPE_STRING:
                if sep == '!':
                    val = val.replace('*', '*2A').replace('!', '*21')
                else:
                    val = quote(val, safe='~()*!.\'')

            elif ds.type == ds.TYPE_BYTES:
                val = urlsafe_b64encode(val).decode('ascii').strip('=')

            elif ds.type == ds.TYPE_BOOL:
                val = int(val)

            obj.append('%d%s%s' % (ds.number, types_enc[ds.type], val))

    return obj
xsrfutil.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

    Args:
        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

    Returns:
        A string XSRF protection token.
    """
    digester = hmac.new(_to_bytes(key, encoding='utf-8'))
    digester.update(_to_bytes(str(user_id), encoding='utf-8'))
    digester.update(DELIMITER)
    digester.update(_to_bytes(action_id, encoding='utf-8'))
    digester.update(DELIMITER)
    when = _to_bytes(str(when or int(time.time())), encoding='utf-8')
    digester.update(when)
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
mujson_mgr.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def ssrlink(self, user, encode, muid):
        protocol = user.get('protocol', '')
        obfs = user.get('obfs', '')
        protocol = protocol.replace("_compatible", "")
        obfs = obfs.replace("_compatible", "")
        protocol_param = ''
        if muid is not None:
            protocol_param_ = user.get('protocol_param', '')
            param = protocol_param_.split('#')
            if len(param) == 2:
                for row in self.data.json:
                    if int(row['port']) == muid:
                        param = str(muid) + ':' + row['passwd']
                        protocol_param = '/?protoparam=' + common.to_str(base64.urlsafe_b64encode(common.to_bytes(param))).replace("=", "")
                        break
        link = ("%s:%s:%s:%s:%s:%s" % (self.server_addr, user['port'], protocol, user['method'], obfs, common.to_str(base64.urlsafe_b64encode(common.to_bytes(user['passwd']))).replace("=", ""))) + protocol_param
        return "ssr://" + (encode and common.to_str(base64.urlsafe_b64encode(common.to_bytes(link))).replace("=", "") or link)
gemail.py 文件源码 项目:cloud-site 作者: Mengjianhua-c 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def CreateMessage(sender, to, subject, message_text):
    """Create a message for an email.

    Args:
      sender: Email address of the sender.
      to: Email address of the receiver.
      subject: The subject of the email message.
      message_text: The text of the email message.

    Returns:
      An object containing a base64url encoded email object.
    """
    message = MIMEText(message_text)
    message['to'] = to
    message['from'] = sender
    message['subject'] = subject
    return {'raw': base64.urlsafe_b64encode(message.as_string())}
xsrf.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def generate_token_string(self, action=None):
        """Generate a hash of the given token contents that can be verified.

        :param action:
            A string representing the action that the generated hash is valid
            for. This string is usually a URL.
        :returns:
            A string containing the hash contents of the given `action` and the
            contents of the `XSRFToken`. Can be verified with
            `verify_token_string`. The string is base64 encoded so it is safe
            to use in HTML forms without escaping.
        """
        digest_maker = self._digest_maker()
        digest_maker.update(self.user_id)
        digest_maker.update(self._DELIMITER)
        if action:
            digest_maker.update(action)
            digest_maker.update(self._DELIMITER)

        digest_maker.update(str(self.current_time))
        return base64.urlsafe_b64encode(
            self._DELIMITER.join([digest_maker.hexdigest(),
                                  str(self.current_time)]))
fernet.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
cryptolib.py 文件源码 项目:python-confidant-client 作者: lyft 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def create_datakey(encryption_context, keyid, client=None):
    '''
    Create a datakey from KMS.
    '''
    if not client:
        client = confidant_client.services.get_boto_client('kms')
    # Fernet key; from spec and cryptography implementation, but using
    # random from KMS, rather than os.urandom:
    #   https://github.com/fernet/spec/blob/master/Spec.md#key-format
    #   https://cryptography.io/en/latest/_modules/cryptography/fernet/#Fernet.generate_key
    key = base64.urlsafe_b64encode(
        client.generate_random(NumberOfBytes=32)['Plaintext']
    )
    response = client.encrypt(
        KeyId='{0}'.format(keyid),
        Plaintext=key,
        EncryptionContext=encryption_context

    )
    return {'ciphertext': response['CiphertextBlob'],
            'plaintext': key}
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0):
  from email.mime.text import MIMEText
  userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email'))
  if not gmail:
    return
  msg = MIMEText(msg_txt)
  msg[u'Subject'] = msg_subj
  msg[u'From'] = userId
  msg[u'To'] = msg_rcpt
  action = Act.Get()
  Act.Set(Act.SENDEMAIL)
  try:
    callGAPI(gmail.users().messages(), u'send',
             userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'')
    entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count)
  except googleapiclient.errors.HttpError as e:
    entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count)
  Act.Set(action)

# Write a CSV file
_pkce.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def code_verifier(n_bytes=64):
    """
    Generates a 'code_verifier' as described in section 4.1 of RFC 7636.

    This is a 'high-entropy cryptographic random string' that will be
    impractical for an attacker to guess.

    Args:
        n_bytes: integer between 31 and 96, inclusive. default: 64
            number of bytes of entropy to include in verifier.

    Returns:
        Bytestring, representing urlsafe base64-encoded random data.
    """
    verifier = base64.urlsafe_b64encode(os.urandom(n_bytes)).rstrip(b'=')
    # https://tools.ietf.org/html/rfc7636#section-4.1
    # minimum length of 43 characters and a maximum length of 128 characters.
    if len(verifier) < 43:
        raise ValueError("Verifier too short. n_bytes must be > 30.")
    elif len(verifier) > 128:
        raise ValueError("Verifier too long. n_bytes must be < 97.")
    else:
        return verifier
_pkce.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 55 收藏 0 点赞 0 评论 0
def code_challenge(verifier):
    """
    Creates a 'code_challenge' as described in section 4.2 of RFC 7636
    by taking the sha256 hash of the verifier and then urlsafe
    base64-encoding it.

    Args:
        verifier: bytestring, representing a code_verifier as generated by
            code_verifier().

    Returns:
        Bytestring, representing a urlsafe base64-encoded sha256 hash digest,
            without '=' padding.
    """
    digest = hashlib.sha256(verifier).digest()
    return base64.urlsafe_b64encode(digest).rstrip(b'=')
xsrfutil.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

    Args:
        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

    Returns:
        A string XSRF protection token.
    """
    digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8'))
    digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8'))
    digester.update(DELIMITER)
    digester.update(_helpers._to_bytes(action_id, encoding='utf-8'))
    digester.update(DELIMITER)
    when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8')
    digester.update(when)
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
http_simple.py 文件源码 项目:shadowsocksr 作者: shadowsocksr-backup 项目源码 文件源码 阅读 90 收藏 0 点赞 0 评论 0
def client_encode(self, buf):
        if self.raw_trans_sent:
            return buf
        self.send_buffer += buf
        if not self.has_sent_header:
            port = b''
            if self.server_info.port != 80:
                port = b':' + common.to_bytes(str(self.server_info.port))
            self.has_sent_header = True
            http_head = b"GET / HTTP/1.1\r\n"
            http_head += b"Host: " + (self.server_info.param or self.server_info.host) + port + b"\r\n"
            http_head += b"Connection: Upgrade, HTTP2-Settings\r\nUpgrade: h2c\r\n"
            http_head += b"HTTP2-Settings: " + base64.urlsafe_b64encode(buf) + b"\r\n"
            return http_head + b"\r\n"
        if self.has_recv_header:
            ret = self.send_buffer
            self.send_buffer = b''
            self.raw_trans_sent = True
            return ret
        return b''
gmail.py 文件源码 项目:libmozdata 作者: mozilla 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
    """Send an email
    """
    subtype = 'html' if html else 'plain'
    message = MIMEMultipart()
    message['To'] = ', '.join(To)
    message['Subject'] = Subject
    message['Cc'] = ', '.join(Cc)
    message['Bcc'] = ', '.join(Bcc)

    message.attach(MIMEText(Body, subtype))

    for f in files:
        with open(f, "rb") as In:
            part = MIMEApplication(In.read(), Name=basename(f))
            part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
            message.attach(part)

    message = {'raw': base64.urlsafe_b64encode(message.as_string())}

    credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
    Http = credentials.authorize(httplib2.Http())
    service = discovery.build('gmail', 'v1', http=Http)

    message = service.users().messages().send(userId='me', body=message).execute()
querystringsafe_base64.py 文件源码 项目:querystringsafe_base64 作者: ClearcodeHQ 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def encode(to_encode):
    """
    Encode an arbitrary string as a base64 that is safe to put as a URL query value.

    urllib.quote and urllib.quote_plus do not have any effect on the
    result of querystringsafe_base64.encode.

    :param (str, bytes) to_encode:
    :rtype: str
    :return: a string that is safe to put as a value in an URL query
        string - like base64, except characters ['+', '/', '='] are
        replaced with ['-', '_', '.'] consequently
    """
    encoded = urlsafe_b64encode(to_encode).replace(b'=', b'.')
    if PY2:
        return encoded
    return encoded.decode()
fernet.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
gmailsendapi.py 文件源码 项目:arxiv-feed-mailer 作者: basnijholt 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def create_message(sender, to, subject, message_text, style='html'):
    """Create a message for an email.

    Args:
      sender: Email address of the sender.
      to: Email address of the receiver.
      subject: The subject of the email message.
      message_text: The text of the email message.

    Returns:
      An object containing a base64url encoded email object.
    """
    if python_version == 2:
        message_text = unicode(message_text).encode('utf-8')
    message = MIMEText(message_text, _subtype=style)
    message['to'] = to
    message['from'] = sender
    message['subject'] = subject
    message = message.as_string()
    if python_version == 2:
        message = base64.urlsafe_b64encode(message)
    else:
        message = base64.urlsafe_b64encode(
            message.encode('utf-8')).decode('utf-8')
    return {'raw': message}
utils.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def make_id():
    """Make uniq short id"""
    return urlsafe_b64encode(uuid4().bytes).rstrip(b'=')
database.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_hash(self, data, hasher=None):
        """
        Get the hash of some data, using a particular hash algorithm, if
        specified.

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        """
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
        else:
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
wheel.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
        try:
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
wheel.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using hashlib.new(algo)"""
    h = hashlib.new(algo)
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
            h.update(block)
    digest = 'sha256=' + urlsafe_b64encode(
        h.digest()
    ).decode('latin1').rstrip('=')
    return (digest, length)
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def digest(self):
        if self.hashtype == 'md5':
            return self.hash.hexdigest()
        digest = self.hash.digest()
        return self.hashtype + '=' + native(urlsafe_b64encode(digest))
database.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_hash(self, data, hasher=None):
        """
        Get the hash of some data, using a particular hash algorithm, if
        specified.

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        """
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
        else:
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
wheel.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
        try:
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
wheel.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using hashlib.new(algo)"""
    h = hashlib.new(algo)
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
            h.update(block)
    digest = 'sha256=' + urlsafe_b64encode(
        h.digest()
    ).decode('latin1').rstrip('=')
    return (digest, length)
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 261 收藏 0 点赞 0 评论 0
def digest(self):
        if self.hashtype == 'md5':
            return self.hash.hexdigest()
        digest = self.hash.digest()
        return self.hashtype + '=' + native(urlsafe_b64encode(digest))
oauth.py 文件源码 项目:DropboxConnect 作者: raguay 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(self, url_state=None):
        """
        Starts the OAuth 2 authorization process.

        This function builds an "authorization URL".  You should redirect your
        user's browser to this URL, which will give them an opportunity to
        grant your app access to their Dropbox account.  When the user
        completes this process, they will be automatically redirected to the
        ``redirect_uri`` you passed in to the constructor.

        This function will also save a CSRF token to
        ``session[csrf_token_session_key]`` (as provided to the constructor).
        This CSRF token will be checked on :meth:`finish()` to prevent request
        forgery.

        :param str url_state: Any data that you would like to keep in the URL
            through the authorization process.  This exact value will be
            returned to you by :meth:`finish()`.
        :return: The URL for a page on Dropbox's website.  This page will let
            the user "approve" your app, which gives your app permission to
            access the user's Dropbox account. Tell the user to visit this URL
            and approve your app.
        """
        csrf_token = base64.urlsafe_b64encode(os.urandom(16)).decode('ascii')
        state = csrf_token
        if url_state is not None:
            state += "|" + url_state
        self.session[self.csrf_token_session_key] = csrf_token

        return self._get_authorize_url(self.redirect_uri, state)
envelope.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def pack(self, message, ascii_encoded=False):
        """Packs a message for transport as described in y/cep342.

        Use :func:`unpack` to decode the packed message.

        Args:
            message (data_pipeline.message.Message): The message to pack
            ascii_decoded (Optional[bool]): Set to True if message is not valid ASCII

        Returns:
            bytes: Avro byte string prepended by magic envelope version byte

        The initial "magic byte" is meant to specify the envelope schema version.
        See y/cep342 for details.  In other words, the version number of the current
        schema is the null byte.  In the event we need to add additional envelope
        versions, we'll use this byte to identify it.

        In addition, the "magic byte" is used as a protocol to encode the serialized
        message in base64. See DATAPIPE-1350 for more detail. This option has been
        added because as of now, yelp_clog only supports sending valid ASCII strings.
        Producer/Consumer registration will make use of this to instead send base64
        encoded strings.
        """
        msg = bytes(0) + self._avro_string_writer.encode(message.avro_repr)

        if ascii_encoded:
            return self.ASCII_MAGIC_BYTE + base64.urlsafe_b64encode(msg)
        else:
            return msg
views.py 文件源码 项目:openbare 作者: openbare 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _pack_up(credentials):
    return base64.urlsafe_b64encode(
        bytes(json.dumps(credentials), encoding="UTF-8")
    )


问题


面经


文章

微信
公众号

扫码关注公众号