def encode_fallback(fallback, currency):
""" Encode all supported fallback addresses.
"""
if currency == 'bc' or currency == 'tb':
fbhrp, witness = bech32_decode(fallback)
if fbhrp:
if fbhrp != currency:
raise ValueError("Not a bech32 address for this currency")
wver = witness[0]
if wver > 16:
raise ValueError("Invalid witness version {}".format(witness[0]))
wprog = u5_to_bitarray(witness[1:])
else:
addr = base58.b58decode_check(fallback)
if is_p2pkh(currency, addr[0]):
wver = 17
elif is_p2sh(currency, addr[0]):
wver = 18
else:
raise ValueError("Unknown address type for {}".format(currency))
wprog = addr[1:]
return tagged('f', bitstring.pack("uint:5", wver) + wprog)
else:
raise NotImplementedError("Support for currency {} not implemented".format(currency))
python类b58decode_check()的实例源码
def validate_sig(body, sig_str, pkh):
"""
Validate the signature on the body of the request - throws exception if not valid.
"""
# Check permission to update
if (pkh is None):
raise PermissionError("pkh not found.")
# Validate the pkh format
base58.b58decode_check(pkh)
if (len(pkh) < 20) or (len(pkh) > 40):
raise PermissionError("Invalid pkh")
try:
if not sig_str:
raise PermissionError("X-Bitcoin-Sig header not found.")
if not wallet.verify_bitcoin_message(body, sig_str, pkh):
raise PermissionError("X-Bitcoin-Sig header not valid.")
except Exception as err:
logger.error("Failure: {0}".format(err))
raise PermissionError("X-Bitcoin-Sig header validation failed.")
return True
def encode_fallback(fallback, currency):
""" Encode all supported fallback addresses.
"""
if currency == 'bc' or currency == 'tb':
fbhrp, witness = bech32_decode(fallback)
if fbhrp:
if fbhrp != currency:
raise ValueError("Not a bech32 address for this currency")
wver = witness[0]
if wver > 16:
raise ValueError("Invalid witness version {}".format(witness[0]))
wprog = u5_to_bitarray(witness[1:])
else:
addr = base58.b58decode_check(fallback)
if is_p2pkh(currency, addr[0]):
wver = 17
elif is_p2sh(currency, addr[0]):
wver = 18
else:
raise ValueError("Unknown address type for {}".format(currency))
wprog = addr[1:]
return tagged('f', bitstring.pack("uint:5", wver) + wprog)
else:
raise NotImplementedError("Support for currency {} not implemented".format(currency))
def from_b58check(private_key):
""" Decodes a Base58Check encoded private-key.
Args:
private_key (str): A Base58Check encoded private key.
Returns:
PrivateKey: A PrivateKey object
"""
b58dec = base58.b58decode_check(private_key)
version = b58dec[0]
assert version in [PrivateKey.TESTNET_VERSION,
PrivateKey.MAINNET_VERSION]
return PrivateKey(int.from_bytes(b58dec[1:], 'big'))
def from_b58check(key):
""" Decodes a Base58Check encoded key.
The encoding must conform to the description in:
https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format
Args:
key (str): A Base58Check encoded key.
Returns:
HDPrivateKey or HDPublicKey:
Either an HD private or
public key object, depending on what was serialized.
"""
return HDKey.from_bytes(base58.b58decode_check(key))
def address_to_key_hash(s):
""" Given a Bitcoin address decodes the version and
RIPEMD-160 hash of the public key.
Args:
s (bytes): The Bitcoin address to decode
Returns:
(version, h160) (tuple): A tuple containing the version and
RIPEMD-160 hash of the public key.
"""
n = base58.b58decode_check(s)
version = n[0]
h160 = n[1:]
return version, h160
def from_b58check(private_key):
""" Decodes a Base58Check encoded private-key.
Args:
private_key (str): A Base58Check encoded private key.
Returns:
PrivateKey: A PrivateKey object
"""
b58dec = base58.b58decode_check(private_key)
version = b58dec[0]
assert version in [PrivateKey.TESTNET_VERSION,
PrivateKey.MAINNET_VERSION]
return PrivateKey(int.from_bytes(b58dec[1:], 'big'))
def from_b58check(key):
""" Decodes a Base58Check encoded key.
The encoding must conform to the description in:
https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format
Args:
key (str): A Base58Check encoded key.
Returns:
HDPrivateKey or HDPublicKey:
Either an HD private or
public key object, depending on what was serialized.
"""
return HDKey.from_bytes(base58.b58decode_check(key))
def coinbaseMessage(previous_output, receiver_address, my_address, private_key):
receiver_hashed_pubkey= base58.b58decode_check(receiver_address)[1:].encode("hex")
my_hashed_pubkey = base58.b58decode_check(my_address)[1:].encode("hex")
# Transaction stuff
version = struct.pack("<L", 1)
lock_time = struct.pack("<L", 0)
hash_code = struct.pack("<L", 1)
# Transactions input
tx_in_count = struct.pack("<B", 1)
tx_in = {}
tx_in["outpoint_hash"] = previous_output.decode('hex')[::-1]
tx_in["outpoint_index"] = struct.pack("<L", 4294967295)
tx_in["script"] = ("76a914%s88ac" % my_hashed_pubkey).decode("hex")
tx_in["script_bytes"] = struct.pack("<B", (len(tx_in["script"])))
tx_in["sequence"] = "ffffffff".decode("hex")
# Transaction output
tx_out_count = struct.pack("<B", 1)
tx_out = {}
tx_out["value"]= struct.pack("<Q", 100000000 * 12.50033629)
tx_out["pk_script"]= ("76a914%s88ac" % receiver_hashed_pubkey).decode("hex")
tx_out["pk_script_bytes"]= struct.pack("<B", (len(tx_out["pk_script"])))
tx_to_sign = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] +
tx_in["script_bytes"] + tx_in["script"] + tx_in["sequence"] + tx_out_count +
tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time + hash_code)
# Signing txn
hashed_raw_tx = hashlib.sha256(hashlib.sha256(tx_to_sign).digest()).digest()
sk = ecdsa.SigningKey.from_string(private_key.decode("hex"), curve = ecdsa.SECP256k1)
vk = sk.verifying_key
public_key = ('\04' + vk.to_string()).encode("hex")
sign = sk.sign_digest(hashed_raw_tx, sigencode=ecdsa.util.sigencode_der)
# Complete txn
sigscript = sign + "\01" + struct.pack("<B", len(public_key.decode("hex"))) + public_key.decode("hex")
real_tx = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] +
struct.pack("<B", (len(sigscript) + 1)) + struct.pack("<B", len(sign) + 1) + sigscript +
tx_in["sequence"] + tx_out_count + tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time)
return real_tx
def _flush(client, wallet, machine_auth, amount=None, payout_address=None, silent=False,
to_primary=False):
""" Flushes current off-chain buffer to the blockchain.
Args:
client (two1.server.rest_client.TwentyOneRestClient) an object for
sending authenticated requests to the TwentyOne backend.
wallet (two1.wallet.Wallet): a user's wallet instance.
amount (int): The amount to be flushed. Should be more than 10k.
payout_address (string): The address to flush the Bitcoins to.
silent (boolean): If True, disables the confirmation prompt.
to_primary (boolean): If True, flushes to the primary wallet.
Raises:
ServerRequestError: if server returns an error code other than 401.
"""
# check the payout address
if payout_address:
try:
base58.b58decode_check(payout_address)
except ValueError:
logger.error(uxstring.UxString.flush_invalid_address)
return
# select the wallet and the associated payout address for the flush
all_wallets = client.list_wallets()
wallet_payout_address, wallet_name = _select_flush_wallet(client, machine_auth, payout_address,
all_wallets, to_primary)
# ask user for confirmation
if not silent:
# if the user has not specified a payout_address then the buffer will be flushed to the
# primary wallet.
is_local = payout_address is None
should_continue = _show_confirmation(machine_auth, amount, all_wallets,
wallet_payout_address, wallet_name, to_primary,
is_local)
if not should_continue:
return
# perform flush
try:
response = client.flush_earnings(amount=amount, payout_address=wallet_payout_address)
if response.ok:
success_msg = uxstring.UxString.flush_success.format(wallet_payout_address)
logger.info(success_msg)
except exceptions.ServerRequestError as ex:
if ex.status_code == 401:
logger.info(ex.message)
elif ex.status_code == 400 and ex.data.get("error") == "TO500":
logger.info(uxstring.UxString.flush_not_enough_earnings.format(amount), fg="red")
elif ex.status_code == 403 and ex.data.get("error") == "social_account_required_to_flush":
logger.info("You must connect a social account with your 21.co account before you can flush.", fg="red")
else:
raise ex
def PrivateKeyFromNEP2(nep2_key, passphrase):
"""
Gets the private key from a NEP-2 encrypted private key
Args:
nep2_key (str): The nep-2 encrypted private key
passphrase (str): The password to encrypt the private key with, as unicode string
Returns:
bytes: The private key
"""
if not nep2_key or len(nep2_key) != 58:
raise ValueError('Please provide a nep2_key with a length of 58 bytes (LEN: {0:d})'.format(len(nep2_key)))
ADDRESS_HASH_SIZE = 4
ADDRESS_HASH_OFFSET = len(NEP_FLAG) + len(NEP_HEADER)
try:
decoded_key = base58.b58decode_check(nep2_key)
except Exception as e:
raise ValueError("Invalid nep2_key")
address_hash = decoded_key[ADDRESS_HASH_OFFSET:ADDRESS_HASH_OFFSET + ADDRESS_HASH_SIZE]
encrypted = decoded_key[-32:]
pwd_normalized = bytes(unicodedata.normalize('NFC', passphrase), 'utf-8')
derived = scrypt.hash(pwd_normalized, address_hash,
N=SCRYPT_ITERATIONS,
r=SCRYPT_BLOCKSIZE,
p=SCRYPT_PARALLEL_FACTOR,
buflen=SCRYPT_KEY_LEN_BYTES)
derived1 = derived[:32]
derived2 = derived[32:]
cipher = AES.new(derived2, AES.MODE_ECB)
decrypted = cipher.decrypt(encrypted)
private_key = xor_bytes(decrypted, derived1)
# Now check that the address hashes match. If they don't, the password was wrong.
kp_new = KeyPair(priv_key=private_key)
kp_new_address = kp_new.GetAddress()
kp_new_address_hash_tmp = hashlib.sha256(kp_new_address.encode('utf-8')).digest()
kp_new_address_hash_tmp2 = hashlib.sha256(kp_new_address_hash_tmp).digest()
kp_new_address_hash = kp_new_address_hash_tmp2[:4]
if (kp_new_address_hash != address_hash):
raise ValueError("Wrong passphrase")
return private_key