def solve(x):
cry = x
clear = base58.b58decode(cry)
li = list()
for i in clear:
li.append(ord(i))
ori = clear[:-4]
#chk = clear[-4:]
rechk = hashlib.sha256(ori).digest()
rechk = hashlib.sha256(rechk).digest()
#a = list()
#for i in rechk:
# a.append(ord(i))
checksum = rechk[:4]
final = ori+checksum
return base58.b58encode(final)
python类b58encode()的实例源码
def testTrustAnchorDisclosesEncryptedAttribute(addedEncryptedAttribute, symEncData,
looper, userSignerA, trustAnchorSigner,
trustAnchor):
box = libnacl.public.Box(trustAnchorSigner.naclSigner.keyraw,
userSignerA.naclSigner.verraw)
data = json.dumps({SKEY: symEncData.secretKey,
TXN_ID: addedEncryptedAttribute[TXN_ID]})
nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False)
op = {
TARGET_NYM: userSignerA.verstr,
TXN_TYPE: ATTRIB,
NONCE: base58.b58encode(nonce),
ENC: base58.b58encode(boxedMsg)
}
submitAndCheck(looper, trustAnchor, op,
identifier=trustAnchorSigner.verstr)
def Export(self):
"""
Export this KeyPair's private key in WIF format.
Returns:
str: The key in wif format
"""
data = bytearray(38)
data[0] = 0x80
data[1:33] = self.PrivateKey[0:32]
data[33] = 0x01
checksum = Crypto.Default().Hash256(data[0:34])
data[34:38] = checksum[0:4]
b58 = base58.b58encode(bytes(data))
return b58
def test_should_throw_error_on_invalid_checksum(self):
# build fake wif
fakewif = bytearray(34 * 'A', 'utf8')
fakewif[0] = 0x80
fakewif[33] = 0x01
# fake checksum
fakewif.append(0xDE)
fakewif.append(0xAD)
fakewif.append(0xBE)
fakewif.append(0xEF)
encodedFakeWIF = base58.b58encode(bytes(fakewif))
with self.assertRaises(ValueError) as context:
KeyPair.PrivateKeyFromWIF(encodedFakeWIF)
self.assertEqual('Invalid WIF Checksum!', str(context.exception))
def testTrustAnchorDisclosesEncryptedAttribute(
addedEncryptedAttribute,
symEncData,
looper,
userSignerA,
trustAnchorSigner,
trustAnchor):
box = libnacl.public.Box(trustAnchorSigner.naclSigner.keyraw,
userSignerA.naclSigner.verraw)
data = json.dumps({SKEY: symEncData.secretKey,
TXN_ID: addedEncryptedAttribute[TXN_ID]})
nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False)
op = {
TARGET_NYM: userSignerA.verstr,
TXN_TYPE: ATTRIB,
NONCE: base58.b58encode(nonce),
ENC: base58.b58encode(boxedMsg)
}
submitAndCheck(looper, trustAnchor, op,
identifier=trustAnchorSigner.verstr)
def _fulfillment_to_details(fulfillment):
"""Encode a fulfillment as a details dictionary
Args:
fulfillment: Crypto-conditions Fulfillment object
"""
if fulfillment.type_name == 'ed25519-sha-256':
return {
'type': 'ed25519-sha-256',
'public_key': base58.b58encode(fulfillment.public_key),
}
if fulfillment.type_name == 'threshold-sha-256':
subconditions = [
_fulfillment_to_details(cond['body'])
for cond in fulfillment.subconditions
]
return {
'type': 'threshold-sha-256',
'threshold': fulfillment.threshold,
'subconditions': subconditions,
}
raise UnsupportedTypeError(fulfillment.type_name)
def address_bytes_to_string(proto, buf):
from .util import decode_big_endian_16
if proto.code == P_IP4:
return str(IPAddress(int(buf, 16), 4).ipv4())
elif proto.code == P_IP6:
return str(IPAddress(int(buf, 16), 6).ipv6())
elif proto.code in [P_TCP, P_UDP, P_DCCP, P_SCTP]:
return str(decode_big_endian_16(binascii.unhexlify(buf)))
elif proto.code == P_ONION:
buf = binascii.unhexlify(buf)
addr_bytes, port_bytes = (buf[:-2], buf[-2:])
addr = base64.b32encode(addr_bytes).decode('ascii').lower()
port = str(decode_big_endian_16(port_bytes))
return ':'.join([addr, port])
elif proto.code == P_IPFS:
buf = binascii.unhexlify(buf)
size, num_bytes_read = read_varint_code(buf)
buf = buf[num_bytes_read:]
if len(buf) != size:
raise ValueError("inconsistent lengths")
return base58.b58encode(buf)
raise ValueError("unknown protocol")
def hex_private_key_to_WIF_private_key(hex_key):
"""
Converts a raw 256-bit hex private key to WIF format
returns => <string> in hex format
"""
hex_key_with_prefix = "80" + hex_key
h1 = hash_sha256(hex_key_with_prefix.decode("hex"))
h2 = hash_sha256(h1.decode("hex"))
checksum = h2[0:8]
wif_key_before_base58Check = hex_key_with_prefix + checksum
wif_key = b58encode(wif_key_before_base58Check.decode("hex"))
return wif_key
################################################################################################
#
# Bitcoin helper functions
#
################################################################################################
def keypair(seed=None):
if not seed:
seed = nacl.utils.random(32)
signing_key = nacl.signing.SigningKey(seed=seed)
private_key = signing_key.to_curve25519_private_key()
return {'sign': signing_key,
'sign_b58': base58.b58encode(signing_key.encode()),
'verify': signing_key.verify_key,
'verify_b58': base58.b58encode(signing_key.verify_key.encode()),
'private': private_key,
'private_b58': base58.b58encode(private_key.encode()),
'public': private_key.public_key,
'public_b58': base58.b58encode(private_key.public_key.encode()),
'seed': seed}
def testSponsorDisclosesEncryptedAttribute(addedEncryptedAttribute, symEncData,
looper, userSignerA, sponsorSigner,
sponsor):
box = libnacl.public.Box(sponsorSigner.naclSigner.keyraw,
userSignerA.naclSigner.verraw)
data = json.dumps({SKEY: symEncData.secretKey,
TXN_ID: addedEncryptedAttribute[TXN_ID]})
nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False)
op = {
TARGET_NYM: userSignerA.verstr,
TXN_TYPE: ATTRIB,
NONCE: base58.b58encode(nonce),
ENC: base58.b58encode(boxedMsg)
}
submitAndCheck(looper, sponsor, op,
identifier=sponsorSigner.verstr)
def __init__(self, private_key=0):
if private_key == 0:
self.private_key = os.urandom(32)
self.printable_pk = str(binascii.hexlify(self.private_key), "ascii")
else:
self.printable_pk = private_key
self.private_key = binascii.unhexlify(private_key.encode('ascii'))
self.sk = ecdsa.SigningKey.from_string(self.private_key, curve = ecdsa.SECP256k1)
self.vk = self.sk.verifying_key
self.public_key = b"04" + binascii.hexlify(self.vk.to_string())
ripemd160 = hashlib.new('ripemd160')
ripemd160.update(hashlib.sha256(binascii.unhexlify(self.public_key)).digest())
self.hashed_public_key = b"00" + binascii.hexlify(ripemd160.digest())
self.checksum = binascii.hexlify(hashlib.sha256(hashlib.sha256(binascii.unhexlify(self.hashed_public_key)).digest()).digest()[:4])
self.binary_addr = binascii.unhexlify(self.hashed_public_key + self.checksum)
self.addr = base58.b58encode(self.binary_addr)
def sendAsset(pubKey, privKey, recipient, assetId, amount, txfee):
timestamp = int(time.time() * 1000)
sData = '\4' + base58.b58decode(pubKey) + '\1' + base58.b58decode(assetId) + '\0' + struct.pack(">Q", timestamp) + struct.pack(">Q", amount) + struct.pack(">Q", txfee) + base58.b58decode(recipient) + '\0\0'
random64 = os.urandom(64)
signature = base58.b58encode(curve.calculateSignature(random64, base58.b58decode(privKey), sData))
data = json.dumps({
"assetId": assetId,
"senderPublicKey": pubKey,
"recipient": recipient,
"amount": amount,
"fee": txfee,
"timestamp": timestamp,
"attachment": "",
"signature": signature
})
c = pycurl.Curl()
c.setopt(pycurl.URL, "http://%s:%s/assets/broadcast/transfer" % (NODE_IP, NODE_PORT))
c.setopt(pycurl.HTTPHEADER, ['Content-Type: application/json', 'Accept: application/json'])
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
c.close()
def invalid_verkey_tdir(tdir_for_func):
ledger = Ledger(CompactMerkleTree(), dataDir=tdir_for_func)
for d in range(3):
txn = {TXN_TYPE: '0',
TARGET_NYM: base58.b58encode(b'whatever'),
IDENTIFIER: "Th7MpTaRZVRYnPiabds81Y",
DATA: {
NAME: str(d),
ALIAS: 'test' + str(d),
SERVICES: [VALIDATOR],
}
}
if d == 1:
txn[TARGET_NYM] = "invalid===="
ledger.add(txn)
ledger.stop()
def __init__(self, verkey, identifier=None):
_verkey = verkey
self._verkey = None
self._vr = None
if identifier:
rawIdr = b58decode(identifier)
if len(rawIdr) == 32 and not verkey: # assume cryptonym
verkey = identifier
assert verkey, 'verkey must be provided'
if verkey[0] == '~': # abbreviated
verkey = b58encode(b58decode(identifier) +
b58decode(verkey[1:]))
try:
self.verkey = verkey
except Exception as ex:
raise InvalidKey("verkey {}".format(_verkey)) from ex
def solve(x):
base58char = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
for i in range(len(x)):
for char in base58char:
cry = x[:i]+char+x[i+1:]
print cry
clear = base58.b58decode(str(cry))
ori = clear[:-4]
chk = clear[-4:]
rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest()
if chk == rechk[:4]: return cry
for i in range(len(x)):
for j in range(len(x)):
if i == j: continue
for charI in base58char:
for charJ in base58char:
cry = x[:i]+charI+x[i+1:]
cry = cry[:j]+charJ+cry[j+1:]
print cry
clear = base58.b58decode(str(cry))
ori = clear[:-4]
chk = clear[-4:]
rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest()
if chk == rechk[:4]: return cry
'''
clear = base58.b58decode(str(cry))
#li = list()
#for i in clear:
# li.append(ord(i))
ori = clear[:-4]
chk = clear[-4:]
rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest()
checksum = rechk[:4]
final = ori+checksum
return base58.b58encode(final)
'''
#print solve(cry)
#print solve('15GJF8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
def hexb58(hex):
return base58.b58encode(hex.decode('hex_codec'))
def hash_to_address(version, hash):
vh = version + hash
return base58.b58encode(vh + double_sha256(vh)[:4])
def hexb58(hex):
return base58.b58encode(hex.decode('hex_codec'))
def hash_to_address(version, hash):
vh = version + hash
return base58.b58encode(vh + double_sha256(vh)[:4])
def _generate(self, privateKey='', seed=''):
self.seed = seed
if not privateKey and not seed:
wordCount = 2048
words = []
for i in range(5):
r = crypto.bytes2str(os.urandom(4))
x = (ord(r[3])) + (ord(r[2]) << 8) + (ord(r[1]) << 16) + (ord(r[0]) << 24)
w1 = x % wordCount
w2 = ((int(x / wordCount) >> 0) + w1) % wordCount
w3 = ((int((int(x / wordCount) >> 0) / wordCount) >> 0) + w2) % wordCount
words.append(wordList[w1])
words.append(wordList[w2])
words.append(wordList[w3])
self.seed = ' '.join(words)
seedHash = crypto.hashChain(('\0\0\0\0' + self.seed).encode('utf-8'))
accountSeedHash = crypto.sha256(seedHash)
if not privateKey:
privKey = curve.generatePrivateKey(accountSeedHash)
else:
privKey = base58.b58decode(privateKey)
pubKey = curve.generatePublicKey(privKey)
unhashedAddress = chr(1) + str(pywaves.CHAIN_ID) + crypto.hashChain(pubKey)[0:20]
addressHash = crypto.hashChain(crypto.str2bytes(unhashedAddress))[0:4]
self.address = base58.b58encode(crypto.str2bytes(unhashedAddress + addressHash))
self.publicKey = base58.b58encode(pubKey)
self.privateKey = base58.b58encode(privKey)
def sendWaves(self, recipient, amount, attachment='', txFee=pywaves.DEFAULT_TX_FEE, timestamp=0):
if not self.privateKey:
logging.error('Private key required')
elif amount <= 0:
logging.error('Amount must be > 0')
elif not pywaves.OFFLINE and self.balance() < amount + txFee:
logging.error('Insufficient Waves balance')
else:
if timestamp == 0:
timestamp = int(time.time() * 1000)
sData = b'\4' + \
base58.b58decode(self.publicKey) + \
b'\0\0' + \
struct.pack(">Q", timestamp) + \
struct.pack(">Q", amount) + \
struct.pack(">Q", txFee) + \
base58.b58decode(recipient.address) + \
struct.pack(">H", len(attachment)) + \
crypto.str2bytes(attachment)
signature = crypto.sign(self.privateKey, sData)
data = json.dumps({
"senderPublicKey": self.publicKey,
"recipient": recipient.address,
"amount": amount,
"fee": txFee,
"timestamp": timestamp,
"attachment": base58.b58encode(crypto.str2bytes(attachment)),
"signature": signature
})
return pywaves.wrapper('/assets/broadcast/transfer', data)
def sendAsset(self, recipient, asset, amount, attachment='', txFee=pywaves.DEFAULT_TX_FEE, timestamp=0):
if not self.privateKey:
logging.error('Private key required')
elif not pywaves.OFFLINE and not asset.status():
logging.error('Asset not issued')
elif amount <= 0:
logging.error('Amount must be > 0')
elif not pywaves.OFFLINE and self.balance(asset.assetId) < amount:
logging.error('Insufficient asset balance')
elif not pywaves.OFFLINE and self.balance() < txFee:
logging.error('Insufficient Waves balance')
else:
if timestamp == 0:
timestamp = int(time.time() * 1000)
sData = b'\4' + \
base58.b58decode(self.publicKey) + \
b'\1' + \
base58.b58decode(asset.assetId) + \
b'\0' + \
struct.pack(">Q", timestamp) + \
struct.pack(">Q", amount) + \
struct.pack(">Q", txFee) + \
base58.b58decode(recipient.address) + \
struct.pack(">H", len(attachment)) + \
crypto.str2bytes(attachment)
signature = crypto.sign(self.privateKey, sData)
data = json.dumps({
"assetId": asset.assetId,
"senderPublicKey": self.publicKey,
"recipient": recipient.address,
"amount": amount,
"fee": txFee,
"timestamp": timestamp,
"attachment": base58.b58encode(crypto.str2bytes(attachment)),
"signature": signature
})
return pywaves.wrapper('/assets/broadcast/transfer', data)
def sign(privateKey, message):
random64 = os.urandom(64)
return base58.b58encode(curve.calculateSignature(random64, base58.b58decode(privateKey), message))
def id(message):
return base58.b58encode(hashlib.sha256(message).digest())
def simpleEncrypt(self, passphrase, data):
"""Encrypt data.
Args:
passphrase (str): passphrase to use for encryption.
data (str/bytes): original data.
Returns:
(str): base58-encoded encrypted data.
"""
key, iv = self.EVP_BytesToKey(passphrase, 32, 16)
return base58.b58encode(self.encrypt(data, key, iv))
def bytes2ascii(s):
"""
>>> bytes2ascii(b"test")
'3yZe7d'
"""
return b58encode(s)
def pet2ascii(p):
"""
>>> from petlib.ec import EcGroup, EcPt
>>> G = EcGroup()
>>> pt = EcPt(G)
>>> pet2ascii(pt)
'3Xw3vNAdCmDLs'
"""
return b58encode(encode(p))
def apply_creation_cell(acc, update):
result = copy.deepcopy(acc)
try:
result['entity'] = base58.b58encode(update['entity']['@link'])
except KeyError as e:
pass
return result
def get_object_chain(reference, acc):
if reference is None:
return acc
db = get_db()
obj = db.get(reference, retry_if_missing=True)
try:
next_ref = obj['chain']['@link']
next_ref = base58.b58encode(next_ref)
except KeyError as e:
next_ref = None
return get_object_chain(next_ref, [obj] + acc)
def stringify_refs(obj):
res = {}
for k, v in obj.iteritems():
if isinstance(v, dict):
v = stringify_refs(v)
if k == u'@link' and isinstance(v, basestring):
try:
v = base58.b58encode(v)
except (ValueError, AttributeError, TypeError):
pass
res[k] = v
return res