def ascii2pet(s):
"""
>>> ascii2pet('3Xw3vNAdCmDLs')
EcPt(00)
"""
return decode(b58decode(s))
python类b58decode()的实例源码
def from_base58(cls, base58str):
byte_string = b58decode(base58str)
return cls(byte_string)
def verifySignature(self, msg: Dict[str, str]):
signature = msg.get(f.SIG.nm)
identifier = msg.get(IDENTIFIER)
msgWithoutSig = {k: v for k, v in msg.items() if k != f.SIG.nm}
# TODO This assumes the current key is the cryptonym. This is a BAD
# ASSUMPTION!!! Sovrin needs to provide the current key.
ser = serializeMsg(msgWithoutSig)
signature = b58decode(signature.encode())
typ = msg.get(TYPE)
# TODO: Maybe keeping ACCEPT_INVITE open is a better option than keeping
# an if condition here?
if typ == ACCEPT_INVITE:
verkey = msg.get(VERKEY)
else:
try:
link = self.getLinkForMsg(msg)
verkey = self.getVerkeyForLink(link)
except (LinkNotFound, VerkeyNotFound):
# This is for verification of `NOTIFY` events
link = self.wallet.getLinkBy(remote=identifier)
# TODO: If verkey is None, it should be fetched from Sovrin.
# Assuming CID for now.
verkey = link.remoteVerkey
v = DidVerifier(verkey, identifier=identifier)
if not v.verify(signature, ser):
raise SignatureRejected
else:
if typ == ACCEPT_INVITE:
self.logger.info('Signature accepted.')
return True
def _getNym(self, nym):
identity = Identity(identifier=nym)
req = self.activeWallet.requestIdentity(
identity, sender=self.activeWallet.defaultId)
self.activeClient.submitReqs(req)
self.print("Getting nym {}".format(nym))
def getNymReply(reply, err, *args):
try:
if err:
self.print("Error: {}".format(err), Token.BoldOrange)
return
if reply and reply[DATA]:
data=json.loads(reply[DATA])
if data:
idr = base58.b58decode(nym)
if data.get(VERKEY) is None:
if len(idr) == 32:
self.print(
"Current verkey is same as identifier {}"
.format(nym), Token.BoldBlue)
else:
self.print(
"No verkey ever assigned to the identifier {}".
format(nym), Token.BoldBlue)
return
if data.get(VERKEY) == '':
self.print("No active verkey found for the identifier {}".
format(nym), Token.BoldBlue)
else:
self.print("Current verkey for NYM {} is {}".
format(nym, data[VERKEY]), Token.BoldBlue)
else:
self.print("NYM {} not found".format(nym), Token.BoldBlue)
except BaseException as e:
self.print("Error during fetching verkey: {}".format(e),
Token.BoldOrange)
self.looper.loop.call_later(.2, self._ensureReqCompleted,
req.key, self.activeClient, getNymReply)
def doAttrDisclose(self, origin, target, txnId, key):
box = libnacl.public.Box(b58decode(origin), b58decode(target))
data = json.dumps({TXN_ID: txnId, SKEY: key})
nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False)
op = {
TARGET_NYM: target,
TXN_TYPE: DISCLO,
NONCE: b58encode(nonce),
DATA: b58encode(boxedMsg)
}
self.submit(op, identifier=origin)
def PrivateKeyFromWIF(wif):
"""
Get the private key from a WIF key
Args:
wif (str): The wif key
Returns:
bytes: The private key
"""
if wif is None or len(wif) is not 52:
raise ValueError('Please provide a wif with a length of 52 bytes (LEN: {0:d})'.format(len(wif)))
data = base58.b58decode(wif)
length = len(data)
if length is not 38 or data[0] is not 0x80 or data[33] is not 0x01:
raise ValueError("Invalid format!")
checksum = Crypto.Hash256(data[0:34])[0:4]
if checksum != data[34:]:
raise ValueError("Invalid WIF Checksum!")
return data[1:33]
def AddrStrToScriptHash(address):
data = b58decode(address)
if len(data) != 25:
raise ValueError('Not correct Address, wrong length.')
if data[0] != settings.ADDRESS_VERSION:
raise ValueError('Not correct Coin Version')
checksum = Crypto.Default().Hash256(data[:21])[:4]
if checksum != data[21:]:
raise Exception('Address format error')
return UInt160(data=data[1:21])
def verifySignature(self, msg: Dict[str, str]):
signature = msg.get(f.SIG.nm)
identifier = msg.get(IDENTIFIER)
msgWithoutSig = {k: v for k, v in msg.items() if k != f.SIG.nm}
# TODO This assumes the current key is the cryptonym. This is a BAD
# ASSUMPTION!!! Indy needs to provide the current key.
ser = serialize_msg_for_signing(msgWithoutSig)
signature = b58decode(signature.encode())
typ = msg.get(TYPE)
# TODO: Maybe keeping ACCEPT_INVITE open is a better option than keeping
# an if condition here?
if typ == ACCEPT_INVITE:
verkey = msg.get(VERKEY)
else:
try:
link = self.getLinkForMsg(msg)
verkey = self.getVerkeyForLink(link)
except (ConnectionNotFound, VerkeyNotFound):
# This is for verification of `NOTIFY` events
link = self.wallet.getConnectionBy(remote=identifier)
# TODO: If verkey is None, it should be fetched from Indy.
# Assuming CID for now.
verkey = link.remoteVerkey
v = DidVerifier(verkey, identifier=identifier)
if not v.verify(signature, ser):
raise SignatureRejected
else:
if typ == ACCEPT_INVITE:
self.logger.info('Signature accepted.')
return True
def doAttrDisclose(self, origin, target, txnId, key):
box = libnacl.public.Box(b58decode(origin), b58decode(target))
data = json.dumps({TXN_ID: txnId, SKEY: key})
nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False)
op = {
TARGET_NYM: target,
TXN_TYPE: DISCLO,
NONCE: b58encode(nonce),
DATA: b58encode(boxedMsg)
}
self.submit(op, identifier=origin)
def verifySig(identifier, signature, msg) -> bool:
key = cryptonymToHex(identifier) if not isHex(
identifier) else identifier
ser = serialize_msg_for_signing(msg)
b64sig = signature.encode('utf-8')
sig = b58decode(b64sig)
vr = Verifier(key)
return vr.verify(sig, ser)
def commit(self, txnCount, stateRoot, txnRoot) -> List:
r = super().commit(txnCount, stateRoot, txnRoot)
stateRoot = base58.b58decode(stateRoot.encode())
self.idrCache.onBatchCommitted(stateRoot)
return r
def verifyMsg(verifier, sig):
sig = base58.b58decode(sig)
return verifier.verifyMsg(sig, MsgForSigning)
def _sign_simple_signature_fulfillment(cls, input_, message, key_pairs):
"""Signs a Ed25519Fulfillment.
Args:
input_ (:class:`~bigchaindb.common.transaction.
Input`) The input to be signed.
message (str): The message to be signed
key_pairs (dict): The keys to sign the Transaction with.
"""
# NOTE: To eliminate the dangers of accidentally signing a condition by
# reference, we remove the reference of input_ here
# intentionally. If the user of this class knows how to use it,
# this should never happen, but then again, never say never.
input_ = deepcopy(input_)
public_key = input_.owners_before[0]
try:
# cryptoconditions makes no assumptions of the encoding of the
# message to sign or verify. It only accepts bytestrings
input_.fulfillment.sign(
message.encode(),
base58.b58decode(key_pairs[public_key].encode()),
)
except KeyError:
raise KeypairMismatchException('Public key {} is not a pair to '
'any of the private keys'
.format(public_key))
return input_
def _sign_threshold_signature_fulfillment(cls, input_, message, key_pairs):
"""Signs a ThresholdSha256.
Args:
input_ (:class:`~bigchaindb.common.transaction.
Input`) The Input to be signed.
message (str): The message to be signed
key_pairs (dict): The keys to sign the Transaction with.
"""
input_ = deepcopy(input_)
for owner_before in set(input_.owners_before):
# TODO: CC should throw a KeypairMismatchException, instead of
# our manual mapping here
# TODO FOR CC: Naming wise this is not so smart,
# `get_subcondition` in fact doesn't return a
# condition but a fulfillment
# TODO FOR CC: `get_subcondition` is singular. One would not
# expect to get a list back.
ccffill = input_.fulfillment
subffills = ccffill.get_subcondition_from_vk(
base58.b58decode(owner_before))
if not subffills:
raise KeypairMismatchException('Public key {} cannot be found '
'in the fulfillment'
.format(owner_before))
try:
private_key = key_pairs[owner_before]
except KeyError:
raise KeypairMismatchException('Public key {} is not a pair '
'to any of the private keys'
.format(owner_before))
# cryptoconditions makes no assumptions of the encoding of the
# message to sign or verify. It only accepts bytestrings
for subffill in subffills:
subffill.sign(message.encode(), base58.b58decode(private_key.encode()))
return input_
def test_fulfill(self, driver, alice_keypair, unsigned_transaction):
signed_transaction = driver.transactions.fulfill(
unsigned_transaction, private_keys=alice_keypair.sk)
unsigned_transaction['inputs'][0]['fulfillment'] = None
message = json.dumps(
unsigned_transaction,
sort_keys=True,
separators=(',', ':'),
ensure_ascii=False,
).encode()
ed25519 = Ed25519Sha256(public_key=base58.b58decode(alice_keypair.vk))
ed25519.sign(message, base58.b58decode(alice_keypair.sk))
fulfillment_uri = ed25519.serialize_uri()
assert signed_transaction['inputs'][0]['fulfillment'] == fulfillment_uri # noqa
def make_ed25519_condition(public_key, *, amount=1):
ed25519 = Ed25519Sha256(public_key=base58.b58decode(public_key))
return {
'amount': str(amount),
'condition': {
'details': _fulfillment_to_details(ed25519),
'uri': ed25519.condition_uri,
},
'public_keys': (public_key,),
}
def sign_transaction(transaction, *, public_key, private_key):
ed25519 = Ed25519Sha256(public_key=base58.b58decode(public_key))
message = json.dumps(
transaction,
sort_keys=True,
separators=(',', ':'),
ensure_ascii=False,
)
ed25519.sign(message.encode(), base58.b58decode(private_key))
return ed25519.serialize_uri()
def verifySig(identifier, signature, msg) -> bool:
key = cryptonymToHex(identifier) if not isHex(
identifier) else identifier
ser = serializeMsg(msg)
b64sig = signature.encode('utf-8')
sig = b58decode(b64sig)
vr = Verifier(key)
return vr.verify(sig, ser)
def WIFkey_to_binPrivateKey(wif):
if len(wif) != 52:
print 'Error'
return None
bpk = b58decode(wif)[1:33]
return bpk
def address_to_scriptHash(address):
return hexlify(b58decode(address)[1:-4])