python类decodebytes()的实例源码

__init__.py 文件源码 项目:Isa-Framework 作者: c0hb1rd 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load_local_session(self):

        # ????? Session ?????????????????
        if self.__storage_path__ is not None:

            # ??????????? Session ???????????????? Session ID
            session_path_list = os.listdir(self.__storage_path__)

            # ??????????
            for session_id in session_path_list:
                # ??????????
                path = os.path.join(self.__storage_path__, session_id)

                # ????????
                with open(path, 'rb') as f:
                    content = f.read()

                # ??????? base64 ??
                content = base64.decodebytes(content)

                # ? Session ID ???????????????????
                self.__session_map__[session_id] = json.loads(content.decode())

    # ??????????
email.py 文件源码 项目:lampost_lib 作者: genzgd 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _open_server(self):
        self.server = SMTP(params['smtp_server'], params.get("smtp_port", 587))
        self.server.ehlo()
        self.server.starttls()
        if params.get('use_gmail_oauth2'):
            auth_email, access_token = _get_oauth_info()
            auth_string = b'user=' + bytes(params['sender_email_address'],
                                           'ascii') + b'\1auth=Bearer ' + access_token + b'\1\1'
            log.info(auth_string)
            code, msg = self.server.docmd('AUTH', 'XOAUTH2 ' + (base64.b64encode(auth_string)).decode('ascii'))
            log.info("Code {}  Message {}", code, base64.decodebytes(msg))
            if code == 235:
                return True
            code, msg = self.server.docmd('')
            log.info("code {}, Message {}", code, msg)
            return False
        try:
            self.server.login(params['sender_email_address'], params['sender_password'])
            return True
        except SMTPAuthenticationError:
            log.warn("SMTP Password Authentication Failed", ex_info=True)
            return False
constructor.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def construct_yaml_binary(self, node):
            # type: (Any) -> Any
            try:
                value = self.construct_scalar(node).encode('ascii')
            except UnicodeEncodeError as exc:
                raise ConstructorError(
                    None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
            try:
                if hasattr(base64, 'decodebytes'):
                    return base64.decodebytes(value)
                else:
                    return base64.decodestring(value)
            except binascii.Error as exc:
                raise ConstructorError(
                    None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
constructor.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def construct_python_bytes(self, node):
            # type: (Any) -> Any
            try:
                value = self.construct_scalar(node).encode('ascii')
            except UnicodeEncodeError as exc:
                raise ConstructorError(
                    None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
            try:
                if hasattr(base64, 'decodebytes'):
                    return base64.decodebytes(value)
                else:
                    return base64.decodestring(value)
            except binascii.Error as exc:
                raise ConstructorError(
                    None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
__init__.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_lre_list():
  """ The header include following column:
  * name: LDC2017E22/data/ara-acm/ar-20031215-034005_0-a.sph
  * lre: {'train17', 'eval15', 'train15', 'dev17', 'eval17'}
  * language: {'ara-arb', 'eng-sas', 'fre-hat', 'zho-wuu',
               'eng-gbr', 'ara-ary', 'eng-usg', 'spa-lac',
               'ara-apc', 'qsl-pol', 'spa-eur', 'fre-waf',
               'zho-cdo', 'qsl-rus', 'spa-car', 'ara-arz',
               'zho-cmn', 'por-brz', 'zho-yue', 'zho-nan',
               'ara-acm'}
  * corpus: {'pcm', 'alaw', 'babel', 'ulaw', 'vast', 'mls14'}
  * duration: {'3', '30', '5', '15', '10', '20', '1000', '25'}

  Note
  ----
  Suggested namming scheme:
    `lre/lang/corpus/dur/base_name`
  """
  link = b'aHR0cHM6Ly9zMy5hbWF6b25hd3MuY29tL2FpLWRhdGFzZXRzL2xyZV9saXN0LnR4dA==\n'
  link = str(base64.decodebytes(link), 'utf-8')
  path = get_file(fname=os.path.basename(link),
                  origin=link,
                  outdir=get_datasetpath(root='~'))
  return np.genfromtxt(fname=path, dtype=str, delimiter=' ',
                       skip_header=1)
px.py 文件源码 项目:px 作者: genotrance 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_response_sspi(self, challenge=None):
        dprint("pywin32 SSPI")
        if challenge:
            try:
                challenge = base64.decodebytes(challenge.encode("utf-8"))
            except:
                challenge = base64.decodestring(challenge)
        output_buffer = None
        try:
            error_msg, output_buffer = self.sspi_client.authorize(challenge)
        except pywintypes.error:
            traceback.print_exc(file=sys.stdout)
            return None

        response_msg = output_buffer[0].Buffer
        try:
            response_msg = base64.encodebytes(response_msg.encode("utf-8"))
        except:
            response_msg = base64.encodestring(response_msg)
        response_msg = response_msg.decode("utf-8").replace('\012', '')
        return response_msg
validators.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _is_valid_ssh_rsa_public_key(openssh_pubkey):
    # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression
    # A "good enough" check is to see if the key starts with the correct header.
    import struct
    try:
        from base64 import decodebytes as base64_decode
    except ImportError:
        # deprecated and redirected to decodebytes in Python 3
        from base64 import decodestring as base64_decode
    parts = openssh_pubkey.split()
    if len(parts) < 2:
        return False
    key_type = parts[0]
    key_string = parts[1]

    data = base64_decode(key_string.encode())  # pylint:disable=deprecated-method
    int_len = 4
    str_len = struct.unpack('>I', data[:int_len])[0]  # this should return 7
    return data[int_len:int_len + str_len] == key_type.encode()
keys.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def is_valid_ssh_rsa_public_key(openssh_pubkey):
    # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression # pylint: disable=line-too-long
    # A "good enough" check is to see if the key starts with the correct header.
    import struct
    try:
        from base64 import decodebytes as base64_decode
    except ImportError:
        # deprecated and redirected to decodebytes in Python 3
        from base64 import decodestring as base64_decode

    parts = openssh_pubkey.split()
    if len(parts) < 2:
        return False
    key_type = parts[0]
    key_string = parts[1]

    data = base64_decode(key_string.encode())  # pylint:disable=deprecated-method
    int_len = 4
    str_len = struct.unpack('>I', data[:int_len])[0]  # this should return 7
    return data[int_len:int_len + str_len] == key_type.encode()
__init__.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def decode_basic_auth(logger, http_auth):
    """
    Decodes basic auth from the header string.

    :param logger: logging instance
    :type logger: Logger
    :param http_auth: Basic authentication header
    :type http_auth: string
    :returns: tuple -- (username, passphrase) or (None, None) if empty.
    :rtype: tuple
    """
    if http_auth is not None:
        if http_auth.lower().startswith('basic '):
            try:
                decoded = tuple(base64.decodebytes(
                    http_auth[6:].encode('utf-8')).decode().split(':'))
                if logger:
                    logger.debug('Credentials given: %s', decoded)
                return decoded
            except base64.binascii.Error:
                if logger:
                    logger.info(
                        'Bad base64 data sent. Setting to no user/pass.')
    # Default meaning no user or password
    return (None, None)
sankey_widget.py 文件源码 项目:ipysankeywidget 作者: ricklupton 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save_png(self, filename):
        """Save the diagram to a PNG file.

        The widget must be displayed first before the PNG data is available. To
        display the widget and save an image at the same time, use
        `auto_save_png`.

        Parameters
        ----------
        filename : string
        """
        if self.png:
            data = base64.decodebytes(bytes(self.png, 'ascii'))
            with open(filename, 'wb') as f:
                f.write(data)
        else:
            warnings.warn('No png image available! Try auto_save_png() instead?')
djangoMongoCache.py 文件源码 项目:scripts 作者: vulnersCom 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get(self, key, default=None, version=None):
        """
            Fetch a given key from the cache. If the key does not exist, return
            default, which itself defaults to None.
        """
        key = self.make_key(key, version)
        self.validate_key(key)

        data = self._coll.find_one({'key': key})
        if not data:
            return default

        unencoded = base64.decodebytes(data['data'])
        unpickled = pickle.loads(unencoded)

        return unpickled
djangoMongoCache.py 文件源码 项目:scripts 作者: vulnersCom 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_many(self, keys, version=None):
        """
            Fetch a bunch of keys from the cache. For certain backends (memcached,
            pgsql) this can be *much* faster when fetching multiple values.

            Returns a dict mapping each key in keys to its value. If the given
            key is missing, it will be missing from the response dict.
        """
        out = {}
        parsed_keys = {}

        for key in keys:
            pkey = self.make_key(key, version)
            self.validate_key(pkey)
            parsed_keys[pkey] = key

        data = self._coll.find({'key': {'$in': parsed_keys.keys()}})
        for result in data:
            unencoded = base64.decodebytes(result['data'])
            unpickled = pickle.loads(unencoded)
            out[parsed_keys[result['key']]] = unpickled

        return out
AuthServer.py 文件源码 项目:PyWebDAV3 作者: andrewleech 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def parse_request(self):
        if not six.moves.BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self):
            return False

        if self.DO_AUTH:
            authorization = self.headers.get('Authorization', '')
            if not authorization:
                self.send_autherror(401, b"Authorization Required")
                return False
            scheme, credentials = authorization.split()
            if scheme != 'Basic':
                self.send_error(501)
                return False
            credentials = base64.decodebytes(credentials.encode()).decode()
            user, password = credentials.split(':')
            if not self.get_userinfo(user, password, self.command):
                self.send_autherror(401, b"Authorization Required")
                return False
        return True
service_provider.py 文件源码 项目:skp_edu_docker 作者: TensorMSA 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _internal_service_call(self, share_data) :
        try:
            # internal : IMAGE
            print("?????????? ??? ?? ?? ?? ?? ?????????? ")

            temp = {}
            request_type = share_data.get_request_type()
            decode_text = base64.decodebytes(str.encode(share_data.get_request_data()))
            temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)]
            ml = MultiValueDict(temp)
            # fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb')
            # fp.write(decode_text)
            # fp.close()
            # CNN Prediction
            if(request_type == "image"):
                return_val = PredictNetCnn().run('nn00004', None, ml )
                name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"}
                print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0])
                share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????")
            else :
                share_data.set_output_data("??? ?? ??? ????")

            return share_data
        except Exception as e:
            raise Exception(e)
config.py 文件源码 项目:paperwork-backend 作者: openpaperwork 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load(self, config):
        try:
            value = config.get(self.section, self.token)
            value = value.strip()
            if value != "None":
                try:
                    value = base64.decodebytes(value.encode("utf-8")).decode(
                        'utf-8')
                except Exception as exc:
                    logger.warning(
                        "Failed to decode work dir path ({})".format(value),
                        exc_info=exc
                    )
                value = FS.safe(value)
            else:
                value = None
            self.value = value
            return
        except (configparser.NoOptionError, configparser.NoSectionError):
            pass
        self.value = self.default_value_func()
api_twisted.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _retrieve_content(self, compression, encoding, content):
        """Extract the content of the sent file."""
        # Select the appropriate decompressor.
        if compression is None:
            decompress = lambda s: s
        elif compression == 'bzip2':
            decompress = bz2.decompress
        else:
            raise ValueError('Invalid compression: %s' % compression)

        # Select the appropriate decoder.
        if encoding == 'base64':
            decode = base64.decodebytes
        else:
            raise ValueError('Invalid encoding: %s' % encoding)

        return decompress(decode(content.encode("ascii")))
base64_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
base64_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)
__init__.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def decrypt(self, source):
        decrypter = self._cipher.decryptor()
        source = urllib.parse.unquote(source)
        source = base64.decodebytes(source.encode("utf-8"))
        data_bytes = decrypter.update(source) + decrypter.finalize()
        return data_bytes.rstrip(b"\x00").decode(self._encoding)
constructor.py 文件源码 项目:petronia 作者: groboclown 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def construct_yaml_binary(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
constructor.py 文件源码 项目:petronia 作者: groboclown 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def construct_python_bytes(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
test.py 文件源码 项目:jgscm 作者: src-d 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_get_base64(self):
        bucket = self.bucket
        blob = bucket.blob("test.pickle")
        obj = {"one": 1, "two": [2, 3]}
        blob.upload_from_string(pickle.dumps(obj))
        model = self.contents_manager.get(
            self.path("test.pickle"), format="base64")
        self.assertEqual(model["type"], "file")
        self.assertEqual(model["mimetype"], "application/octet-stream")
        self.assertEqual(model["format"], "base64")
        content = model["content"]
        self.assertIsInstance(content, str)
        bd = base64.decodebytes(content.encode())
        self.assertEqual(obj, pickle.loads(bd))
__init__.py 文件源码 项目:jgscm 作者: src-d 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _save_file(self, path, content, format):
        """Uploads content of a generic file to GCS.
        :param: path blob path.
        :param: content file contents string.
        :param: format the description of the input format, can be either
                "text" or "base64".
        :return: created :class:`google.cloud.storage.Blob`.
        """
        bucket_name, bucket_path = self._parse_path(path)
        bucket = self._get_bucket(bucket_name, throw=True)

        if format not in {"text", "base64"}:
            raise web.HTTPError(
                400,
                u"Must specify format of file contents as \"text\" or "
                u"\"base64\"",
            )
        try:
            if format == "text":
                bcontent = content.encode("utf8")
            else:
                b64_bytes = content.encode("ascii")
                bcontent = base64.decodebytes(b64_bytes)
        except Exception as e:
            raise web.HTTPError(
                400, u"Encoding error saving %s: %s" % (path, e)
            )
        blob = bucket.blob(bucket_path)
        blob.upload_from_string(bcontent)
        return blob
base64_codec.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
base64_codec.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testNonce(self):
        """ WebSocket key should be a random 16-byte nonce.
        """
        key = _create_sec_websocket_key()
        nonce = base64decode(key.encode("utf-8"))
        self.assertEqual(16, len(nonce))
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testNonce(self):
        """ WebSocket key should be a random 16-byte nonce.
        """
        key = _create_sec_websocket_key()
        nonce = base64decode(key.encode("utf-8"))
        self.assertEqual(16, len(nonce))
ingest_message.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load_from_base64(self, b64_msg):
        '''Deserializes a base64-encoded YAML message'''
        return self.load_from_yaml(base64.decodebytes(b64_msg))
util.py 文件源码 项目:aliyun-log-python-sdk 作者: aliyun 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def base64_decodestring(s):
    if six.PY2:
        return base64.decodestring(s)
    else:
        if isinstance(s, str):
            s = s.encode('utf8')
        return base64.decodebytes(s).decode('utf8')
request.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def open_data(self, url, data=None):
        """Use "data" URL."""
        if not isinstance(url, str):
            raise URLError('data error: proxy support for data protocol currently not implemented')
        # ignore POSTed data
        #
        # syntax of data URLs:
        # dataurl   := "data:" [ mediatype ] [ ";base64" ] "," data
        # mediatype := [ type "/" subtype ] *( ";" parameter )
        # data      := *urlchar
        # parameter := attribute "=" value
        try:
            [type, data] = url.split(',', 1)
        except ValueError:
            raise IOError('data error', 'bad data URL')
        if not type:
            type = 'text/plain;charset=US-ASCII'
        semi = type.rfind(';')
        if semi >= 0 and '=' not in type[semi:]:
            encoding = type[semi+1:]
            type = type[:semi]
        else:
            encoding = ''
        msg = []
        msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
                                            time.gmtime(time.time())))
        msg.append('Content-type: %s' % type)
        if encoding == 'base64':
            # XXX is this encoding/decoding ok?
            data = base64.decodebytes(data.encode('ascii')).decode('latin-1')
        else:
            data = unquote(data)
        msg.append('Content-Length: %d' % len(data))
        msg.append('')
        msg.append(data)
        msg = '\n'.join(msg)
        headers = email.message_from_string(msg)
        f = io.StringIO(msg)
        #f.fileno = None     # needed for addinfourl
        return addinfourl(f, headers, url)


问题


面经


文章

微信
公众号

扫码关注公众号