def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
python类b2a_hex()的实例源码
def request_hex_nonce(self):
self.open_send_socket()
# build nonce request payload string
nonce_request = self.authentication.buildNonceRequestPayloadString()
self.logger.debug("Sending nonce request with body of length %d", len(nonce_request))
self.logger.debug('Send: %s', nonce_request)
self.sock.send(nonce_request)
self.logger.debug('Nonce request sent.')
resultbuf_hex = binascii.b2a_hex(self.receive_send_socket(max_receive_bytes=32))
if resultbuf_hex is None:
raise HologramError('Internal nonce error')
return resultbuf_hex
def encrypt(self,text):
cryptor = AES.new(self.key,self.mode,b'0000000000000000')
#????key ?????16?AES-128?,
#24?AES-192?,??32 ?AES-256?Bytes ??
#??AES-128 ??????
length = 16
count = len(text)
if count < length:
add = (length-count)
#\0 backspace
text = text + ('\0' * add)
elif count > length:
add = (length-(count % length))
text = text + ('\0' * add)
self.ciphertext = cryptor.encrypt(text)
#??AES??????????????ascii??????????????????????
#?????????????????16?????
return b2a_hex(self.ciphertext)
#????????????strip() ??
def yandex(url):
try:
cookie = client.request(url, output='cookie')
r = client.request(url, cookie=cookie)
r = re.sub(r'[^\x00-\x7F]+', ' ', r)
sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]
idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]
idclient = binascii.b2a_hex(os.urandom(16))
post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
post = urllib.urlencode(post)
r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
r = json.loads(r)
url = r['models'][0]['data']['file']
return url
except:
return
def connect(self):
"""
Connects the sphero with the address given in the constructor
"""
self._device = bluepy.btle.Peripheral(self._addr, addrType=bluepy.btle.ADDR_TYPE_RANDOM)
self._notifier = DelegateObj(self, self._notification_lock)
#set notifier to be notified
self._device.withDelegate(self._notifier)
self._devModeOn()
self._connected = True #Might need to change to be a callback format
#get the command service
cmd_service = self._device.getServiceByUUID(RobotControlService)
self._cmd_characteristics = {}
characteristic_list = cmd_service.getCharacteristics()
for characteristic in characteristic_list:
uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
self._cmd_characteristics[uuid_str] = characteristic
self._listening_flag = True
self._listening_thread = threading.Thread(target=self._listening_loop)
self._listening_thread.start()
def _devModeOn(self):
"""
A sequence of read/write that enables the developer mode
"""
service = self._device.getServiceByUUID(BLEService)
characteristic_list = service.getCharacteristics()
#make it into a dict
characteristic_dict = {}
for characteristic in characteristic_list:
uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
characteristic_dict[uuid_str] = characteristic
characteristic = characteristic_dict[AntiDosCharacteristic]
characteristic.write("011i3".encode(),True)
characteristic = characteristic_dict[TXPowerCharacteristic]
characteristic.write((7).to_bytes(1, 'big'),True)
characteristic = characteristic_dict[WakeCharacteristic]
characteristic.write((1).to_bytes(1, 'big'),True)
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
def encrypt(self, passwd=None, length=32):
"""
encrypt gen password
???????????
"""
if not passwd:
passwd = self.gen_rand_pass()
cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
try:
count = len(passwd)
except TypeError:
raise ServerError('Encrypt password error, TYpe error.')
add = (length - (count % length))
passwd += ('\0' * add)
cipher_text = cryptor.encrypt(passwd)
return b2a_hex(cipher_text)
def _convert_host_to_hex(host):
"""
Convert the provided host to the format in /proc/net/tcp*
/proc/net/tcp uses little-endian four byte hex for ipv4
/proc/net/tcp6 uses little-endian per 4B word for ipv6
Args:
host: String with either hostname, IPv4, or IPv6 address
Returns:
List of tuples containing address family and the
little-endian converted host
"""
ips = []
if host is not None:
for family, ip in _convert_host_to_ip(host):
hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
hexip_hf = ""
for i in range(0, len(hexip_nf), 8):
ipgroup_nf = hexip_nf[i:i+8]
ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
ips.append((family, hexip_hf))
return ips
def oauth_request_parameters(consumer_token, url, access_token, parameters={},
method="GET", oauth_version="1.0a",
override_version=""):
base_args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=access_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=override_version or oauth_version,
)
args = base_args.copy()
args.update(parameters)
if oauth_version == "1.0a":
signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token)
else:
signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token)
base_args["oauth_signature"] = signature
return base_args
def __init__(self, name, resultdir, vt_type, test=None, mux=None):
self.id = binascii.b2a_hex(os.urandom(20))
self.name = str(name)
self.shortname = "_".join(self.name.split('_')[1:])
self.job_dir = None
self.type = str(name.split('_')[0])
self.resultdir = resultdir
self.conf = None
self.test = test
self.mux = mux
self.run = "Not_Run"
self.runsummary = None
self.runlink = None
if self.type == 'guest':
self.vt_type = vt_type
else:
self.vt_type = None
def parse_xbee_rf_data(data):
if DEBUG:
print data
node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr']))
readings = re.findall("{.*?}", data['rf_data']) # returns a list of K-V pair matches
payload = {'node': node_addr}
for reading in readings:
item_dict = json.loads(reading)
for k, v in item_dict.iteritems():
sensor_name = k.encode('utf-8')
sensor_value = v.encode('utf-8')
if sensor_name == "temp:":
sensor_name = "temp"
payload['sensor'] = sensor_name
payload['val'] = sensor_value
# now pass the payload to the RequestBuilder and send it
print payload
if PRODUCTION:
sendHTTPPost(payload)
def get_sample_name(input_sample_name):
"""
Return sample name, only alphabet and digits are allowed. (no underscore, no space)
If input_sample_name is None or empty string, return a random string
starts with 'sample'
"""
if input_sample_name is None or str(input_sample_name) == "None":
input_sample_name = ""
else:
input_sample_name = str(input_sample_name)
input_sample_name = "".join([x for x in input_sample_name
if x.isalpha() or x.isdigit()])
if len(input_sample_name) == 0:
import binascii
return str("sample"+binascii.b2a_hex(os.urandom(3)))
else:
return input_sample_name
def seed_callback(bot, update, args):
user_id = update.message.from_user.id
chat_id = update.message.chat_id
lang_id = mysql_select_language(user_id)
seed = mysql_select_seed(user_id)
if (seed is False):
seed = binascii.b2a_hex(os.urandom(8)).upper()
mysql_set_seed(user_id, seed)
seed_split = [seed[i:i+4] for i in range(0, len(seed), 4)]
seed_text = seed_split[0] + '-' + seed_split[1] + '-' + seed_split[2] + '-' + seed_split[3]
check = mysql_check_password(user_id)
if ((len(args) > 0) and (check is not False)):
password = args[0]
hex = password_check(update, password)
if (check == hex):
message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
else:
text_reply(update, lang_text('password_error', lang_id))
elif (check is not False):
text_reply(update, lang_text('password_error', lang_id))
else:
message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
def read_body(self, f, body_checksum):
checksum = 0
data = f.read(32)
while len(data) == 32:
meta = dict()
segment = ""
segment,
meta['incremental'],
meta['base'],
meta['encryption'],
meta['compression'],
sha1 = unpack('<2IH2B20s', data)
meta['sha1_hash'] = b2a_hex(sha1)
self.segments[segment] = meta
checksum = crc32(data, checksum)
data = f.read(32)
if checksum != body_checksum:
raise Exception('Body checksum does not match')
def main():
args = parse_args()
manifest = driver.DriverManager(
namespace='ekko.manifest.drivers',
name=args.driver,
invoke_on_load=True,
invoke_args=[args.manifest]
).driver
for segment in manifest.get_segments(manifest.get_metadata()):
print(binascii.b2a_hex(segment.segment_hash))
def hex_encode(input, errors='strict'):
assert errors == 'strict'
return (binascii.b2a_hex(input), len(input))