def get_pwd_rsa(self, pwd, servertime, nonce):
"""
Get rsa2 encrypted password, using RSA module from https://pypi.python.org/pypi/rsa/3.1.1, documents can be accessed at
http://stuvel.eu/files/python-rsa-doc/index.html
"""
#n, n parameter of RSA public key, which is published by WEIBO.COM
#hardcoded here but you can also find it from values return from prelogin status above
weibo_rsa_n = 'EB2A38568661887FA180BDDB5CABD5F21C7BFD59C090CB2D245A87AC253062882729293E5506350508E7F9AA3BB77F4333231490F915F6D63C55FE2F08A49B353F444AD3993CACC02DB784ABBB8E42A9B1BBFFFB38BE18D78E87A0E41B9B8F73A928EE0CCEE1F6739884B9777E4FE9E88A1BBE495927AC4A799B3181D6442443'
#e, exponent parameter of RSA public key, WEIBO uses 0x10001, which is 65537 in Decimal
weibo_rsa_e = 65537
message = str(servertime) + '\t' + str(nonce) + '\n' + str(pwd)
#construct WEIBO RSA Publickey using n and e above, note that n is a hex string
key = rsa.PublicKey(int(weibo_rsa_n, 16), weibo_rsa_e)
#get encrypted password
encropy_pwd = rsa.encrypt(message, key)
#trun back encrypted password binaries to hex string
return binascii.b2a_hex(encropy_pwd)
python类b2a_hex()的实例源码
def readnode(self, lnum, offs):
"""
read a node from a lnum + offset.
"""
ch = UbiFsCommonHeader()
hdrdata = self.vol.read(lnum, offs, ch.hdrsize)
ch.parse(hdrdata)
ch.lnum = lnum
ch.offs = offs
node = ch.getnode()
nodedata = self.vol.read(lnum, offs + ch.hdrsize, ch.len - ch.hdrsize)
if crc32(hdrdata[8:] + nodedata) != ch.crc:
print(ch, node)
print(" %s + %s = %08x -> want = %08x" % ( b2a_hex(hdrdata), b2a_hex(nodedata), crc32(hdrdata[8:] + nodedata), ch.crc))
raise Exception("invalid node crc")
node.parse(nodedata)
return node
def yandex(url):
try:
cookie = client.request(url, output='cookie')
r = client.request(url, cookie=cookie)
r = re.sub(r'[^\x00-\x7F]+', ' ', r)
sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]
idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]
idclient = binascii.b2a_hex(os.urandom(16))
post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
post = urllib.urlencode(post)
r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
r = json.loads(r)
url = r['models'][0]['data']['file']
return url
except:
return
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
if callback_uri == "oob":
args["oauth_callback"] = "oob"
elif callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
if callback_uri == "oob":
args["oauth_callback"] = "oob"
elif callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
def encrypt(self, passwd=None, length=32):
"""
encrypt gen password
???????????
"""
if not passwd:
passwd = self.gen_rand_pass()
cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
try:
count = len(passwd)
except TypeError:
raise ServerError('Encrypt password error, TYpe error.')
add = (length - (count % length))
passwd += ('\0' * add)
cipher_text = cryptor.encrypt(passwd)
return b2a_hex(cipher_text)
def music_files():
"""
Get a list of music files and file identifier hashes as JSON; also refresh
internal cache of music files and hashes.
"""
global music_files_dict
file_paths = sorted(glob.glob(path.join(settings.MUSIC_ROOT, '*')))
out = []
music_files_dict = dict()
for file_path in file_paths:
file_name = path.split(file_path)[1]
file_hash = music_file_hash(file_name)
out.append(dict(name=file_name,
hash=binascii.b2a_hex(file_hash)))
music_files_dict[file_hash] = file_name
# set music files dict in RFID handler
rfid_handler.set_music_files_dict(music_files_dict)
return json.dumps(out)
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
def home():
form = SendToDeviceForm(request.form)
if form.validate_on_submit():
device = PushDevice.query.filter_by(device_uid=form.device_uid.data).first()
if device is None:
flash('Device not found (please check id)', "danger")
else:
otp = binascii.b2a_hex(os.urandom(4)).decode()
push_status_txt = sendpush(device.push_id, otp)
push_json = json.loads(push_status_txt)
if "status" in push_json:
if push_json['status'] == "OK":
flash("One Time Password Sent To Device", "success")
else:
flash("Could Not Communicate With Device ( " + push_status_txt + " )", "danger")
return render_template('main/home.html', form=form)
def register():
ret_dict = {}
if 'push_id' in request.form :
device = PushDevice()
device.push_id = request.form.get('push_id')
device.device_uid = binascii.b2a_hex(os.urandom(4)).decode()
db.session.add(device)
db.session.commit()
ret_dict['device_uid'] = device.device_uid
else :
ret_dict['error'] = 'could not register push device'
return json.dumps(ret_dict)
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
def handle_msg(self, msg_pmt):
msg = pmt.cdr(msg_pmt)
if not pmt.is_u8vector(msg):
print "[ERROR] Received invalid message type. Expected u8vector"
return
packet = bytearray(pmt.u8vector_elements(msg))
if len(packet) <= 3:
return
if self.verbose:
print 'Spacecraft ID', binascii.b2a_hex(packet[:2])
if packet[2] == 0x50:
print 'CSP downlink, protocol version 0'
else:
print 'Unknown packet type'
data = packet[3:]
self.message_port_pub(pmt.intern('out'),
pmt.cons(pmt.PMT_NIL,
pmt.init_u8vector(len(data), data)))
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
def _convert_host_to_hex(host):
"""
Convert the provided host to the format in /proc/net/tcp*
/proc/net/tcp uses little-endian four byte hex for ipv4
/proc/net/tcp6 uses little-endian per 4B word for ipv6
Args:
host: String with either hostname, IPv4, or IPv6 address
Returns:
List of tuples containing address family and the
little-endian converted host
"""
ips = []
if host is not None:
for family, ip in _convert_host_to_ip(host):
hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
hexip_hf = ""
for i in range(0, len(hexip_nf), 8):
ipgroup_nf = hexip_nf[i:i+8]
ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
ips.append((family, hexip_hf))
return ips
def run(self, objHmac, mk, ssid, fast=False):
#Requires Python 2.7.8 or later
#Ubuntu 14.04 LTS generally only updates to 2.7.6 :(
#if fast==True:
# return hashlib.pbkdf2_hmac('sha1', mk, ssid, 4095)
x1 = objHmac.load(mk, ssid + '\0\0\0\1', fast)
x2 = objHmac.load(mk, ssid + '\0\0\0\2', fast)
f1 = a2b_hex(x1)
f2 = a2b_hex(x2)
for x in xrange(4095):
x1 = objHmac.load(mk, a2b_hex(x1), fast)
x2 = objHmac.load(mk, a2b_hex(x2), fast)
f1 = self.xorString(a2b_hex(x1), f1)
f2 = self.xorString(a2b_hex(x2), f2)
out = b2a_hex(f1) + b2a_hex(f2)
return out[0:64]
test_FortunaAccumulator.py 文件源码
项目:git_intgrtn_aws_s3
作者: droidlabour
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)