python类b2a_hex()的实例源码

test_DES.py 文件源码 项目:git_intgrtn_aws_s3 作者: droidlabour 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
HologramCloud.py 文件源码 项目:hologram-python 作者: hologram-io 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def request_hex_nonce(self):

        self.open_send_socket()

        # build nonce request payload string
        nonce_request = self.authentication.buildNonceRequestPayloadString()

        self.logger.debug("Sending nonce request with body of length %d", len(nonce_request))
        self.logger.debug('Send: %s', nonce_request)

        self.sock.send(nonce_request)
        self.logger.debug('Nonce request sent.')

        resultbuf_hex = binascii.b2a_hex(self.receive_send_socket(max_receive_bytes=32))

        if resultbuf_hex is None:
            raise HologramError('Internal nonce error')

        return resultbuf_hex
aes_decryptor.py 文件源码 项目:wizard 作者: honor100 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def encrypt(self,text):
        cryptor = AES.new(self.key,self.mode,b'0000000000000000')
        #????key ?????16?AES-128?,
        #24?AES-192?,??32 ?AES-256?Bytes ??
        #??AES-128 ??????
        length = 16
        count = len(text)
        if count < length:
            add = (length-count)
            #\0 backspace
            text = text + ('\0' * add)
        elif count > length:
            add = (length-(count % length))
            text = text + ('\0' * add)
        self.ciphertext = cryptor.encrypt(text)
        #??AES??????????????ascii??????????????????????
        #?????????????????16?????
        return b2a_hex(self.ciphertext)

    #????????????strip() ??
directstream.py 文件源码 项目:plugin.video.lastship 作者: lastship 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
sphero.py 文件源码 项目:sphero_sprk 作者: CMU-ARM 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect(self):
        """
        Connects the sphero with the address given in the constructor
        """
        self._device = bluepy.btle.Peripheral(self._addr, addrType=bluepy.btle.ADDR_TYPE_RANDOM)
        self._notifier = DelegateObj(self, self._notification_lock)
        #set notifier to be notified
        self._device.withDelegate(self._notifier)

        self._devModeOn()
        self._connected = True #Might need to change to be a callback format
        #get the command service
        cmd_service = self._device.getServiceByUUID(RobotControlService)
        self._cmd_characteristics = {}
        characteristic_list = cmd_service.getCharacteristics()
        for characteristic in characteristic_list:
            uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
            self._cmd_characteristics[uuid_str] = characteristic

        self._listening_flag = True
        self._listening_thread = threading.Thread(target=self._listening_loop)
        self._listening_thread.start()
sphero.py 文件源码 项目:sphero_sprk 作者: CMU-ARM 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _devModeOn(self):
        """
        A sequence of read/write that enables the developer mode
        """
        service = self._device.getServiceByUUID(BLEService)
        characteristic_list = service.getCharacteristics()
        #make it into a dict
        characteristic_dict = {}
        for characteristic in characteristic_list:
            uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
            characteristic_dict[uuid_str] = characteristic

        characteristic = characteristic_dict[AntiDosCharacteristic]
        characteristic.write("011i3".encode(),True)
        characteristic = characteristic_dict[TXPowerCharacteristic]
        characteristic.write((7).to_bytes(1, 'big'),True)
        characteristic = characteristic_dict[WakeCharacteristic]
        characteristic.write((1).to_bytes(1, 'big'),True)
test_FortunaAccumulator.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
common.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
common.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
test_DES.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
api.py 文件源码 项目:geekcloud 作者: GeekCloud-Team 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def encrypt(self, passwd=None, length=32):
        """
        encrypt gen password
        ???????????
        """
        if not passwd:
            passwd = self.gen_rand_pass()

        cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
        try:
            count = len(passwd)
        except TypeError:
            raise ServerError('Encrypt password error, TYpe error.')

        add = (length - (count % length))
        passwd += ('\0' * add)
        cipher_text = cryptor.encrypt(passwd)
        return b2a_hex(cipher_text)
wait_for.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
sdstream.py 文件源码 项目:slidoc 作者: mitotic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def oauth_request_parameters(consumer_token, url, access_token, parameters={},
                             method="GET", oauth_version="1.0a",
                             override_version=""):
    base_args = dict(
        oauth_consumer_key=consumer_token["key"],
        oauth_token=access_token["key"],
        oauth_signature_method="HMAC-SHA1",
        oauth_timestamp=str(int(time.time())),
        oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
        oauth_version=override_version or oauth_version,
    )
    args = base_args.copy()
    args.update(parameters)
    if oauth_version == "1.0a":
        signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token)
    else:
        signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token)
    base_args["oauth_signature"] = signature
    return base_args
avocado-setup.py 文件源码 项目:tests 作者: open-power-host-os 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, name, resultdir, vt_type, test=None, mux=None):
        self.id = binascii.b2a_hex(os.urandom(20))
        self.name = str(name)
        self.shortname = "_".join(self.name.split('_')[1:])
        self.job_dir = None
        self.type = str(name.split('_')[0])
        self.resultdir = resultdir
        self.conf = None
        self.test = test
        self.mux = mux
        self.run = "Not_Run"
        self.runsummary = None
        self.runlink = None
        if self.type == 'guest':
            self.vt_type = vt_type
        else:
            self.vt_type = None
parser.py 文件源码 项目:fewsn-zigbee-base-station 作者: sethmbaker 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse_xbee_rf_data(data):

    if DEBUG:
        print data

    node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr']))
    readings = re.findall("{.*?}", data['rf_data'])        # returns a list of K-V pair matches
    payload = {'node': node_addr}

    for reading in readings:
        item_dict = json.loads(reading)
        for k, v in item_dict.iteritems():
            sensor_name = k.encode('utf-8')
            sensor_value = v.encode('utf-8')
            if sensor_name == "temp:":
                sensor_name = "temp"
            payload['sensor'] = sensor_name
            payload['val'] = sensor_value
            # now pass the payload to the RequestBuilder and send it
            print payload
            if PRODUCTION:
                sendHTTPPost(payload)
Utils.py 文件源码 项目:pbtranscript 作者: PacificBiosciences 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_sample_name(input_sample_name):
    """
    Return sample name, only alphabet and digits are allowed. (no underscore, no space)
    If input_sample_name is None or empty string, return a random string
    starts with 'sample'
    """
    if input_sample_name is None or str(input_sample_name) == "None":
        input_sample_name = ""
    else:
        input_sample_name = str(input_sample_name)

    input_sample_name = "".join([x for x in input_sample_name
                                 if x.isalpha() or x.isdigit()])

    if len(input_sample_name) == 0:
        import binascii
        return str("sample"+binascii.b2a_hex(os.urandom(3)))
    else:
        return input_sample_name
raiwalletbot.py 文件源码 项目:RaiWalletBot 作者: SergiySW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def seed_callback(bot, update, args):
    user_id = update.message.from_user.id
    chat_id = update.message.chat_id
    lang_id = mysql_select_language(user_id)
    seed = mysql_select_seed(user_id)
    if (seed is False):
        seed = binascii.b2a_hex(os.urandom(8)).upper()
        mysql_set_seed(user_id, seed)
    seed_split = [seed[i:i+4] for i in range(0, len(seed), 4)]
    seed_text = seed_split[0] + '-' + seed_split[1] + '-' + seed_split[2] + '-' + seed_split[3]
    check = mysql_check_password(user_id)
    if ((len(args) > 0) and (check is not False)):
        password = args[0]
        hex = password_check(update, password)
        if (check == hex):
            message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
        else:
            text_reply(update, lang_text('password_error', lang_id))
    elif (check is not False):
        text_reply(update, lang_text('password_error', lang_id))
    else:
        message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
osdk.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def read_body(self, f, body_checksum):
        checksum = 0

        data = f.read(32)
        while len(data) == 32:
            meta = dict()
            segment = ""

            segment,
            meta['incremental'],
            meta['base'],
            meta['encryption'],
            meta['compression'],
            sha1 = unpack('<2IH2B20s', data)

            meta['sha1_hash'] = b2a_hex(sha1)

            self.segments[segment] = meta
            checksum = crc32(data, checksum)
            data = f.read(32)

        if checksum != body_checksum:
            raise Exception('Body checksum does not match')
dump_manifest.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    args = parse_args()
    manifest = driver.DriverManager(
        namespace='ekko.manifest.drivers',
        name=args.driver,
        invoke_on_load=True,
        invoke_args=[args.manifest]
    ).driver

    for segment in manifest.get_segments(manifest.get_metadata()):
        print(binascii.b2a_hex(segment.segment_hash))
hex_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def hex_encode(input, errors='strict'):
    assert errors == 'strict'
    return (binascii.b2a_hex(input), len(input))


问题


面经


文章

微信
公众号

扫码关注公众号