def load_local_session(self):
# ????? Session ?????????????????
if self.__storage_path__ is not None:
# ??????????? Session ???????????????? Session ID
session_path_list = os.listdir(self.__storage_path__)
# ??????????
for session_id in session_path_list:
# ??????????
path = os.path.join(self.__storage_path__, session_id)
# ????????
with open(path, 'rb') as f:
content = f.read()
# ??????? base64 ??
content = base64.decodebytes(content)
# ? Session ID ???????????????????
self.__session_map__[session_id] = json.loads(content.decode())
# ??????????
python类decodebytes()的实例源码
def _open_server(self):
self.server = SMTP(params['smtp_server'], params.get("smtp_port", 587))
self.server.ehlo()
self.server.starttls()
if params.get('use_gmail_oauth2'):
auth_email, access_token = _get_oauth_info()
auth_string = b'user=' + bytes(params['sender_email_address'],
'ascii') + b'\1auth=Bearer ' + access_token + b'\1\1'
log.info(auth_string)
code, msg = self.server.docmd('AUTH', 'XOAUTH2 ' + (base64.b64encode(auth_string)).decode('ascii'))
log.info("Code {} Message {}", code, base64.decodebytes(msg))
if code == 235:
return True
code, msg = self.server.docmd('')
log.info("code {}, Message {}", code, msg)
return False
try:
self.server.login(params['sender_email_address'], params['sender_password'])
return True
except SMTPAuthenticationError:
log.warn("SMTP Password Authentication Failed", ex_info=True)
return False
def construct_yaml_binary(self, node):
# type: (Any) -> Any
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(
None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(
None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def construct_python_bytes(self, node):
# type: (Any) -> Any
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(
None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(
None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def load_lre_list():
""" The header include following column:
* name: LDC2017E22/data/ara-acm/ar-20031215-034005_0-a.sph
* lre: {'train17', 'eval15', 'train15', 'dev17', 'eval17'}
* language: {'ara-arb', 'eng-sas', 'fre-hat', 'zho-wuu',
'eng-gbr', 'ara-ary', 'eng-usg', 'spa-lac',
'ara-apc', 'qsl-pol', 'spa-eur', 'fre-waf',
'zho-cdo', 'qsl-rus', 'spa-car', 'ara-arz',
'zho-cmn', 'por-brz', 'zho-yue', 'zho-nan',
'ara-acm'}
* corpus: {'pcm', 'alaw', 'babel', 'ulaw', 'vast', 'mls14'}
* duration: {'3', '30', '5', '15', '10', '20', '1000', '25'}
Note
----
Suggested namming scheme:
`lre/lang/corpus/dur/base_name`
"""
link = b'aHR0cHM6Ly9zMy5hbWF6b25hd3MuY29tL2FpLWRhdGFzZXRzL2xyZV9saXN0LnR4dA==\n'
link = str(base64.decodebytes(link), 'utf-8')
path = get_file(fname=os.path.basename(link),
origin=link,
outdir=get_datasetpath(root='~'))
return np.genfromtxt(fname=path, dtype=str, delimiter=' ',
skip_header=1)
def get_response_sspi(self, challenge=None):
dprint("pywin32 SSPI")
if challenge:
try:
challenge = base64.decodebytes(challenge.encode("utf-8"))
except:
challenge = base64.decodestring(challenge)
output_buffer = None
try:
error_msg, output_buffer = self.sspi_client.authorize(challenge)
except pywintypes.error:
traceback.print_exc(file=sys.stdout)
return None
response_msg = output_buffer[0].Buffer
try:
response_msg = base64.encodebytes(response_msg.encode("utf-8"))
except:
response_msg = base64.encodestring(response_msg)
response_msg = response_msg.decode("utf-8").replace('\012', '')
return response_msg
def _is_valid_ssh_rsa_public_key(openssh_pubkey):
# http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression
# A "good enough" check is to see if the key starts with the correct header.
import struct
try:
from base64 import decodebytes as base64_decode
except ImportError:
# deprecated and redirected to decodebytes in Python 3
from base64 import decodestring as base64_decode
parts = openssh_pubkey.split()
if len(parts) < 2:
return False
key_type = parts[0]
key_string = parts[1]
data = base64_decode(key_string.encode()) # pylint:disable=deprecated-method
int_len = 4
str_len = struct.unpack('>I', data[:int_len])[0] # this should return 7
return data[int_len:int_len + str_len] == key_type.encode()
def is_valid_ssh_rsa_public_key(openssh_pubkey):
# http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression # pylint: disable=line-too-long
# A "good enough" check is to see if the key starts with the correct header.
import struct
try:
from base64 import decodebytes as base64_decode
except ImportError:
# deprecated and redirected to decodebytes in Python 3
from base64 import decodestring as base64_decode
parts = openssh_pubkey.split()
if len(parts) < 2:
return False
key_type = parts[0]
key_string = parts[1]
data = base64_decode(key_string.encode()) # pylint:disable=deprecated-method
int_len = 4
str_len = struct.unpack('>I', data[:int_len])[0] # this should return 7
return data[int_len:int_len + str_len] == key_type.encode()
def decode_basic_auth(logger, http_auth):
"""
Decodes basic auth from the header string.
:param logger: logging instance
:type logger: Logger
:param http_auth: Basic authentication header
:type http_auth: string
:returns: tuple -- (username, passphrase) or (None, None) if empty.
:rtype: tuple
"""
if http_auth is not None:
if http_auth.lower().startswith('basic '):
try:
decoded = tuple(base64.decodebytes(
http_auth[6:].encode('utf-8')).decode().split(':'))
if logger:
logger.debug('Credentials given: %s', decoded)
return decoded
except base64.binascii.Error:
if logger:
logger.info(
'Bad base64 data sent. Setting to no user/pass.')
# Default meaning no user or password
return (None, None)
def save_png(self, filename):
"""Save the diagram to a PNG file.
The widget must be displayed first before the PNG data is available. To
display the widget and save an image at the same time, use
`auto_save_png`.
Parameters
----------
filename : string
"""
if self.png:
data = base64.decodebytes(bytes(self.png, 'ascii'))
with open(filename, 'wb') as f:
f.write(data)
else:
warnings.warn('No png image available! Try auto_save_png() instead?')
def get(self, key, default=None, version=None):
"""
Fetch a given key from the cache. If the key does not exist, return
default, which itself defaults to None.
"""
key = self.make_key(key, version)
self.validate_key(key)
data = self._coll.find_one({'key': key})
if not data:
return default
unencoded = base64.decodebytes(data['data'])
unpickled = pickle.loads(unencoded)
return unpickled
def get_many(self, keys, version=None):
"""
Fetch a bunch of keys from the cache. For certain backends (memcached,
pgsql) this can be *much* faster when fetching multiple values.
Returns a dict mapping each key in keys to its value. If the given
key is missing, it will be missing from the response dict.
"""
out = {}
parsed_keys = {}
for key in keys:
pkey = self.make_key(key, version)
self.validate_key(pkey)
parsed_keys[pkey] = key
data = self._coll.find({'key': {'$in': parsed_keys.keys()}})
for result in data:
unencoded = base64.decodebytes(result['data'])
unpickled = pickle.loads(unencoded)
out[parsed_keys[result['key']]] = unpickled
return out
def parse_request(self):
if not six.moves.BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self):
return False
if self.DO_AUTH:
authorization = self.headers.get('Authorization', '')
if not authorization:
self.send_autherror(401, b"Authorization Required")
return False
scheme, credentials = authorization.split()
if scheme != 'Basic':
self.send_error(501)
return False
credentials = base64.decodebytes(credentials.encode()).decode()
user, password = credentials.split(':')
if not self.get_userinfo(user, password, self.command):
self.send_autherror(401, b"Authorization Required")
return False
return True
def _internal_service_call(self, share_data) :
try:
# internal : IMAGE
print("?????????? ??? ?? ?? ?? ?? ?????????? ")
temp = {}
request_type = share_data.get_request_type()
decode_text = base64.decodebytes(str.encode(share_data.get_request_data()))
temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)]
ml = MultiValueDict(temp)
# fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb')
# fp.write(decode_text)
# fp.close()
# CNN Prediction
if(request_type == "image"):
return_val = PredictNetCnn().run('nn00004', None, ml )
name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"}
print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0])
share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????")
else :
share_data.set_output_data("??? ?? ??? ????")
return share_data
except Exception as e:
raise Exception(e)
def load(self, config):
try:
value = config.get(self.section, self.token)
value = value.strip()
if value != "None":
try:
value = base64.decodebytes(value.encode("utf-8")).decode(
'utf-8')
except Exception as exc:
logger.warning(
"Failed to decode work dir path ({})".format(value),
exc_info=exc
)
value = FS.safe(value)
else:
value = None
self.value = value
return
except (configparser.NoOptionError, configparser.NoSectionError):
pass
self.value = self.default_value_func()
def _retrieve_content(self, compression, encoding, content):
"""Extract the content of the sent file."""
# Select the appropriate decompressor.
if compression is None:
decompress = lambda s: s
elif compression == 'bzip2':
decompress = bz2.decompress
else:
raise ValueError('Invalid compression: %s' % compression)
# Select the appropriate decoder.
if encoding == 'base64':
decode = base64.decodebytes
else:
raise ValueError('Invalid encoding: %s' % encoding)
return decompress(decode(content.encode("ascii")))
def base64_decode(input, errors='strict'):
assert errors == 'strict'
return (base64.decodebytes(input), len(input))
def decode(self, input, final=False):
assert self.errors == 'strict'
return base64.decodebytes(input)
def decrypt(self, source):
decrypter = self._cipher.decryptor()
source = urllib.parse.unquote(source)
source = base64.decodebytes(source.encode("utf-8"))
data_bytes = decrypter.update(source) + decrypter.finalize()
return data_bytes.rstrip(b"\x00").decode(self._encoding)
def construct_yaml_binary(self, node):
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def construct_python_bytes(self, node):
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def test_get_base64(self):
bucket = self.bucket
blob = bucket.blob("test.pickle")
obj = {"one": 1, "two": [2, 3]}
blob.upload_from_string(pickle.dumps(obj))
model = self.contents_manager.get(
self.path("test.pickle"), format="base64")
self.assertEqual(model["type"], "file")
self.assertEqual(model["mimetype"], "application/octet-stream")
self.assertEqual(model["format"], "base64")
content = model["content"]
self.assertIsInstance(content, str)
bd = base64.decodebytes(content.encode())
self.assertEqual(obj, pickle.loads(bd))
def _save_file(self, path, content, format):
"""Uploads content of a generic file to GCS.
:param: path blob path.
:param: content file contents string.
:param: format the description of the input format, can be either
"text" or "base64".
:return: created :class:`google.cloud.storage.Blob`.
"""
bucket_name, bucket_path = self._parse_path(path)
bucket = self._get_bucket(bucket_name, throw=True)
if format not in {"text", "base64"}:
raise web.HTTPError(
400,
u"Must specify format of file contents as \"text\" or "
u"\"base64\"",
)
try:
if format == "text":
bcontent = content.encode("utf8")
else:
b64_bytes = content.encode("ascii")
bcontent = base64.decodebytes(b64_bytes)
except Exception as e:
raise web.HTTPError(
400, u"Encoding error saving %s: %s" % (path, e)
)
blob = bucket.blob(bucket_path)
blob.upload_from_string(bcontent)
return blob
def base64_decode(input, errors='strict'):
assert errors == 'strict'
return (base64.decodebytes(input), len(input))
def decode(self, input, final=False):
assert self.errors == 'strict'
return base64.decodebytes(input)
def testNonce(self):
""" WebSocket key should be a random 16-byte nonce.
"""
key = _create_sec_websocket_key()
nonce = base64decode(key.encode("utf-8"))
self.assertEqual(16, len(nonce))
def testNonce(self):
""" WebSocket key should be a random 16-byte nonce.
"""
key = _create_sec_websocket_key()
nonce = base64decode(key.encode("utf-8"))
self.assertEqual(16, len(nonce))
def load_from_base64(self, b64_msg):
'''Deserializes a base64-encoded YAML message'''
return self.load_from_yaml(base64.decodebytes(b64_msg))
def base64_decodestring(s):
if six.PY2:
return base64.decodestring(s)
else:
if isinstance(s, str):
s = s.encode('utf8')
return base64.decodebytes(s).decode('utf8')
def open_data(self, url, data=None):
"""Use "data" URL."""
if not isinstance(url, str):
raise URLError('data error: proxy support for data protocol currently not implemented')
# ignore POSTed data
#
# syntax of data URLs:
# dataurl := "data:" [ mediatype ] [ ";base64" ] "," data
# mediatype := [ type "/" subtype ] *( ";" parameter )
# data := *urlchar
# parameter := attribute "=" value
try:
[type, data] = url.split(',', 1)
except ValueError:
raise IOError('data error', 'bad data URL')
if not type:
type = 'text/plain;charset=US-ASCII'
semi = type.rfind(';')
if semi >= 0 and '=' not in type[semi:]:
encoding = type[semi+1:]
type = type[:semi]
else:
encoding = ''
msg = []
msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.time())))
msg.append('Content-type: %s' % type)
if encoding == 'base64':
# XXX is this encoding/decoding ok?
data = base64.decodebytes(data.encode('ascii')).decode('latin-1')
else:
data = unquote(data)
msg.append('Content-Length: %d' % len(data))
msg.append('')
msg.append(data)
msg = '\n'.join(msg)
headers = email.message_from_string(msg)
f = io.StringIO(msg)
#f.fileno = None # needed for addinfourl
return addinfourl(f, headers, url)