def test_tile_symmetry(self):
'''
Make sure that tiles are symmetric
'''
upload_file = open('data/Dixon2012-J1-NcoI-R1-filtered.100kb.multires.cool', 'rb')
tileset = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()),
filetype='cooler',
datatype='matrix',
owner=self.user1,
uuid='aa')
ret = self.client.get('/api/v1/tiles/?d=aa.0.0.0')
contents = json.loads(ret.content.decode('utf-8'))
import base64
r = base64.decodestring(contents['aa.0.0.0']['dense'].encode('utf-8'))
q = np.frombuffer(r, dtype=np.float16)
q = q.reshape((256,256))
python类decodestring()的实例源码
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def asciiDecompress(code):
""" decompress result of asciiCompress """
code = base64.decodestring(code)
csum = zlib.crc32(code)
data = zlib.decompress(code)
return data, csum
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def parse_request(self):
if not BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self):
return False
authorization = self.headers.get('Authorization', '')
if authorization:
scheme, credentials = authorization.split()
if scheme != 'Basic':
self.send_error(501)
return False
credentials = base64.decodestring(credentials)
user, password = credentials.split(':', 2)
if not self.get_userinfo(user, password, self.command):
self.send_autherror(401, "Authorization Required")
return False
else:
if not self.get_userinfo(None, None, self.command):
self.send_autherror(401, "Authorization Required")
return False
return True
def parse_request(self):
res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self)
if not res:
return res
database_name = self.path[1:]
if not database_name:
self.tryton = {'user': None, 'session': None}
return res
try:
method, up64 = self.headers['Authorization'].split(None, 1)
if method.strip().lower() == 'basic':
user, password = base64.decodestring(up64).split(':', 1)
user_id, session = security.login(database_name, user,
password)
self.tryton = {'user': user_id, 'session': session}
return res
except Exception:
pass
self.send_error(401, 'Unauthorized')
self.send_header("WWW-Authenticate", 'Basic realm="Tryton"')
return False
def parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header):
'''
Parse basic authentication over HTTP
'''
if authorization_header:
# authorization_header sometimes is triggered by failed ftp
try:
header_val = headers[authorization_header.group()]
except KeyError:
return
b64_auth_re = re.match('basic (.+)', header_val, re.IGNORECASE)
if b64_auth_re != None:
basic_auth_b64 = b64_auth_re.group(1)
basic_auth_creds = base64.decodestring(basic_auth_b64)
msg = 'Basic Authentication: %s' % basic_auth_creds
printer(src_ip_port, dst_ip_port, msg)
def parse_netntlm_resp_msg(headers, resp_header, seq):
'''
Parse the client response to the challenge
'''
try:
header_val3 = headers[resp_header]
except KeyError:
return
header_val3 = header_val3.split(' ', 1)
# The header value can either start with NTLM or Negotiate
if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
try:
msg3 = base64.decodestring(header_val3[1])
except binascii.Error:
return
return parse_ntlm_resp(msg3, seq)
def tocRVOUS_PROPOSE(self,data):
"""
Handle a message that looks like::
RVOUS_PROPOSE:<user>:<uuid>:<cookie>:<seq>:<rip>:<pip>:<vip>:<port>
[:tlv tag1:tlv value1[:tlv tag2:tlv value2[:...]]]
"""
user,uid,cookie,seq,rip,pip,vip,port=data[:8]
cookie=base64.decodestring(cookie)
port=int(port)
tlvs={}
for i in range(8,len(data),2):
key=data[i]
value=base64.decodestring(data[i+1])
tlvs[key]=value
name=UUIDS[uid]
try:
func=getattr(self,"toc%s"%name)
except:
self._debug("no function for UID %s" % uid)
return
func(user,cookie,seq,pip,vip,port,tlvs)
def _authorize(self):
# Authorization, (mostly) per the RFC
try:
authh = self.getHeader("Authorization")
if not authh:
self.user = self.password = ''
return
bas, upw = authh.split()
if bas.lower() != "basic":
raise ValueError
upw = base64.decodestring(upw)
self.user, self.password = upw.split(':', 1)
except (binascii.Error, ValueError):
self.user = self.password = ""
except:
log.err()
self.user = self.password = ""
def state_AUTH(self, line):
self.state = "COMMAND"
try:
parts = base64.decodestring(line).split(None, 1)
except binascii.Error:
self.failResponse("Invalid BASE64 encoding")
else:
if len(parts) != 2:
self.failResponse("Invalid AUTH response")
return
self._auth.username = parts[0]
self._auth.response = parts[1]
d = self.portal.login(self._auth, None, IMailbox)
d.addCallback(self._cbMailbox, parts[0])
d.addErrback(self._ebMailbox)
d.addErrback(self._ebUnexpected)
def _authResponse(self, auth, challenge):
self._failresponse = self.esmtpAUTHDeclined
try:
challenge = base64.decodestring(challenge)
except binascii.Error, e:
# Illegal challenge, give up, then quit
self.sendLine('*')
self._okresponse = self.esmtpAUTHMalformedChallenge
self._failresponse = self.esmtpAUTHMalformedChallenge
else:
resp = auth.challengeResponse(self.secret, challenge)
self._expected = [235]
self._okresponse = self.smtpState_from
self.sendLine(encode_base64(resp, eol=""))
if auth.getName() == "LOGIN" and challenge == "Username:":
self._expected = [334]
self._authinfo = auth
self._okresponse = self.esmtpState_challenge
def testValidLogin(self):
p = pop3.POP3()
p.factory = TestServerFactory()
p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
p.portal = cred.portal.Portal(TestRealm())
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
ch.addUser('testuser', 'testpassword')
p.portal.registerChecker(ch)
s = StringIO.StringIO()
p.transport = internet.protocol.FileWrapper(s)
p.connectionMade()
p.lineReceived("CAPA")
self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
p.lineReceived("AUTH CRAM-MD5")
chal = s.getvalue().splitlines()[-1][2:]
chal = base64.decodestring(chal)
response = hmac.HMAC('testpassword', chal).hexdigest()
p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
self.failUnless(p.mbox)
self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def main():
tmpdir = None
try:
# Create a temporary working directory
tmpdir = tempfile.mkdtemp()
# Unpack the zipfile into the temporary directory
pip_zip = os.path.join(tmpdir, "pip.zip")
with open(pip_zip, "wb") as fp:
fp.write(base64.decodestring(ZIPFILE))
# Add the zipfile to sys.path so that we can import it
sys.path = [pip_zip] + sys.path
# Run the bootstrap
bootstrap(tmpdir=tmpdir)
finally:
# Clean up our temporary working directory
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
def hash_host(hostname, salt=None):
"""
Return a "hashed" form of the hostname, as used by openssh when storing
hashed hostnames in the known_hosts file.
@param hostname: the hostname to hash
@type hostname: str
@param salt: optional salt to use when hashing (must be 20 bytes long)
@type salt: str
@return: the hashed hostname
@rtype: str
"""
if salt is None:
salt = rng.read(SHA.digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = base64.decodestring(salt)
assert len(salt) == SHA.digest_size
hmac = HMAC.HMAC(salt, hostname, SHA).digest()
hostkey = '|1|%s|%s' % (base64.encodestring(salt), base64.encodestring(hmac))
return hostkey.replace('\n', '')
def main():
tmpdir = None
try:
# Create a temporary working directory
tmpdir = tempfile.mkdtemp()
# Unpack the zipfile into the temporary directory
pip_zip = os.path.join(tmpdir, "pip.zip")
with open(pip_zip, "wb") as fp:
fp.write(base64.decodestring(ZIPFILE))
# Add the zipfile to sys.path so that we can import it
sys.path = [pip_zip] + sys.path
# Run the bootstrap
bootstrap(tmpdir=tmpdir)
finally:
# Clean up our temporary working directory
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
def _prepare_bucket_entry_hmac(self, shard_array):
"""
Args:
shard_array (): .
"""
storj_keyring = model.Keyring()
encryption_key = storj_keyring.get_encryption_key('test')
current_hmac = ''
for shard in shard_array:
base64_decoded = '%s%s' % (base64.decodestring(shard.hash),
current_hmac)
current_hmac = self._calculate_hmac(base64_decoded, encryption_key)
self.__logger.debug('current_hmac=%s' % current_hmac)
return current_hmac
def main():
tmpdir = None
try:
# Create a temporary working directory
tmpdir = tempfile.mkdtemp()
# Unpack the zipfile into the temporary directory
pip_zip = os.path.join(tmpdir, "pip.zip")
with open(pip_zip, "wb") as fp:
fp.write(base64.decodestring(ZIPFILE))
# Add the zipfile to sys.path so that we can import it
sys.path = [pip_zip] + sys.path
# Run the bootstrap
bootstrap(tmpdir=tmpdir)
finally:
# Clean up our temporary working directory
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def main():
tmpdir = None
try:
# Create a temporary working directory
tmpdir = tempfile.mkdtemp()
# Unpack the zipfile into the temporary directory
pip_zip = os.path.join(tmpdir, "pip.zip")
with open(pip_zip, "wb") as fp:
fp.write(base64.decodestring(ZIPFILE))
# Add the zipfile to sys.path so that we can import it
sys.path = [pip_zip] + sys.path
# Run the bootstrap
bootstrap(tmpdir=tmpdir)
finally:
# Clean up our temporary working directory
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
def parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header):
'''
Parse basic authentication over HTTP
'''
if authorization_header:
# authorization_header sometimes is triggered by failed ftp
try:
header_val = headers[authorization_header.group()]
except KeyError:
return
b64_auth_re = re.match('basic (.+)', header_val, re.IGNORECASE)
if b64_auth_re != None:
basic_auth_b64 = b64_auth_re.group(1)
basic_auth_creds = base64.decodestring(basic_auth_b64)
msg = 'Basic Authentication: %s' % basic_auth_creds
printer(src_ip_port, dst_ip_port, msg)
def parse_netntlm_resp_msg(headers, resp_header, seq):
'''
Parse the client response to the challenge
'''
try:
header_val3 = headers[resp_header]
except KeyError:
return
header_val3 = header_val3.split(' ', 1)
# The header value can either start with NTLM or Negotiate
if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
try:
msg3 = base64.decodestring(header_val3[1])
except binascii.Error:
return
return parse_ntlm_resp(msg3, seq)
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))
def decode_param(param):
name, v = param.split('=', 1)
values = v.split('\n')
value_results = []
for value in values:
match = re.search(r'=\?((?:\w|-)+)\?(Q|B)\?(.+)\?=', value)
if match:
encoding, type_, code = match.groups()
if type_ == 'Q':
value = quopri.decodestring(code)
elif type_ == 'B':
value = base64.decodestring(code)
value = str_encode(value, encoding)
value_results.append(value)
if value_results:
v = ''.join(value_results)
logger.debug("Decoded parameter {} - {}".format(name, v))
return name, v
def base64_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = base64.decodestring(input)
return (output, len(input))