#!/usr/bin/env python # -*- coding: utf-8 -*- """ Blockstack-client ~~~~~ copyright: (c) 2014-2015 by Halfmoon Labs, Inc. copyright: (c) 2016 by Blockstack.org This file is part of Blockstack-client. Blockstack-client is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Blockstack-client is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Blockstack-client. If not, see . """ import virtualchain from binascii import hexlify import collections import json import traceback import keylib from keylib import ECPrivateKey, ECPublicKey from keylib.hashing import bin_hash160 from keylib.address_formatting import bin_hash160_to_address from keylib.key_formatting import compress, decompress from keylib.public_key_encoding import PubkeyType from .backend.crypto.utils import get_address_from_privkey from .backend.crypto.utils import aes_encrypt, aes_decrypt from keychain import PrivateKeychain import fastecdsa import fastecdsa.curve import fastecdsa.keys import fastecdsa.ecdsa import pybitcoin import bitcoin import binascii import jsonschema from jsonschema.exceptions import ValidationError from utilitybelt import is_hex from .config import get_logger from .constants import CONFIG_PATH, BLOCKSTACK_DEBUG, BLOCKSTACK_TEST log = get_logger() # deriving hardened keys is expensive, so cache them once derived. # maps hex_privkey --> {key_index: child_key} KEY_CACHE = {} KEYCHAIN_CACHE = {} # LRU cache of hashes we've verified VERIFIER_CACHE = None class HDWallet(object): """ Initialize a hierarchical deterministic wallet with hex_privkey and get child addresses and private keys """ def __init__(self, hex_privkey=None): """ If @hex_privkey is given, use that to derive keychain otherwise, use a new random seed """ global KEYCHAIN_CACHE assert hex_privkey self.hex_privkey = hex_privkey self.priv_keychain = None self.master_address = None self.child_addresses = None if KEYCHAIN_CACHE.has_key(str(self.hex_privkey)): if BLOCKSTACK_TEST: log.debug("{} keychain is cached".format(self.hex_privkey)) self.priv_keychain = KEYCHAIN_CACHE[str(self.hex_privkey)] else: if BLOCKSTACK_TEST: log.debug("{} keychain is NOT cached".format(self.hex_privkey)) self.priv_keychain = self.get_priv_keychain(self.hex_privkey) KEYCHAIN_CACHE[str(self.hex_privkey)] = self.priv_keychain self.master_address = self.get_master_address() def get_priv_keychain(self, hex_privkey): if hex_privkey: return PrivateKeychain.from_private_key(hex_privkey) log.debug('No privatekey given, starting new wallet') return PrivateKeychain() def get_master_privkey(self): return self.priv_keychain.private_key() def get_child_privkey(self, index=0): """ @index is the child index Returns: child privkey for given @index """ global KEY_CACHE if KEY_CACHE.has_key(self.hex_privkey) and KEY_CACHE[self.hex_privkey].has_key(index): if BLOCKSTACK_TEST: log.debug("Child {} of {} is cached".format(index, self.hex_privkey)) return KEY_CACHE[self.hex_privkey][index] # expensive... child = self.priv_keychain.hardened_child(index) if not KEY_CACHE.has_key(self.hex_privkey): KEY_CACHE[self.hex_privkey] = {} KEY_CACHE[self.hex_privkey][index] = child.private_key() return child.private_key() @classmethod def get_privkey(cls, hex_privkey, index): """ Get a child private key (static method) """ global KEY_CACHE if KEY_CACHE.has_key(hex_privkey) and KEY_CACHE[hex_privkey].has_key(index): if BLOCKSTACK_TEST: log.debug("Child {} of {} is cached".format(index, hex_privkey)) return KEY_CACHE[hex_privkey][index] hdwallet = HDWallet(hex_privkey) return hdwallet.get_child_privkey(index=index) def get_master_address(self): if self.master_address is not None: return self.master_address hex_privkey = self.get_master_privkey() return get_address_from_privkey(hex_privkey) def get_child_address(self, index=0): """ @index is the child index Returns: child address for given @index """ if self.child_addresses is not None: return self.child_addresses[index] hex_privkey = self.get_child_privkey(index) return get_address_from_privkey(hex_privkey) def get_child_keypairs(self, count=1, offset=0, include_privkey=False): """ Returns (privkey, address) keypairs Returns: returns child keypairs @include_privkey: toggles between option to return privkeys along with addresses or not """ keypairs = [] for index in range(offset, offset + count): address = self.get_child_address(index) if include_privkey: hex_privkey = self.get_child_privkey(index) keypairs.append((address, hex_privkey)) else: keypairs.append(address) return keypairs def get_privkey_from_address(self, target_address, count=1): """ Given a child address, return priv key of that address """ addresses = self.get_child_keypairs(count=count) for i, address in enumerate(addresses): if address == target_address: return self.get_child_privkey(i) return None def is_multisig(privkey_info): """ Does the given private key info represent a multisig bundle? """ from .schemas import PRIVKEY_MULTISIG_SCHEMA try: jsonschema.validate(privkey_info, PRIVKEY_MULTISIG_SCHEMA) return True except ValidationError as e: return False def is_encrypted_multisig(privkey_info): """ Does a given encrypted private key info represent an encrypted multisig bundle? """ from .schemas import ENCRYPTED_PRIVKEY_MULTISIG_SCHEMA try: jsonschema.validate(privkey_info, ENCRYPTED_PRIVKEY_MULTISIG_SCHEMA) return True except ValidationError as e: return False def is_singlesig(privkey_info): """ Does the given private key info represent a single signature bundle? (i.e. one private key)? """ from .schemas import PRIVKEY_SINGLESIG_SCHEMA try: jsonschema.validate(privkey_info, PRIVKEY_SINGLESIG_SCHEMA) return True except ValidationError as e: return False def is_encrypted_singlesig(privkey_info): """ Does the given string represent an encrypted single private key? """ from .schemas import ENCRYPTED_PRIVKEY_SINGLESIG_SCHEMA try: jsonschema.validate(privkey_info, ENCRYPTED_PRIVKEY_SINGLESIG_SCHEMA) return True except ValidationError as e: return False def singlesig_privkey_to_string(privkey_info): """ Convert private key to string """ return ECPrivateKey(privkey_info).to_hex() #return virtualchain.BitcoinPrivateKey(privkey_info).to_hex() def multisig_privkey_to_string(privkey_info): """ Convert multisig keys to string """ return ','.join([singlesig_privkey_to_string(pk) for pk in privkey_info['private_keys']]) def privkey_to_string(privkey_info): """ Convert private key to string Return None on invalid """ if is_singlesig(privkey_info): return singlesig_privkey_to_string(privkey_info) if is_multisig(privkey_info): return multisig_privkey_to_string(privkey_info) return None def encrypt_multisig_info(multisig_info, password): """ Given a multisig info dict, encrypt the sensitive fields. Returns {'encrypted_private_keys': ..., 'encrypted_redeem_script': ..., **other_fields} """ enc_info = { 'encrypted_private_keys': None, 'encrypted_redeem_script': None } hex_password = hexlify(password) assert is_multisig(multisig_info), 'Invalid multisig keys' enc_info['encrypted_private_keys'] = [] for pk in multisig_info['private_keys']: pk_ciphertext = aes_encrypt(pk, hex_password) enc_info['encrypted_private_keys'].append(pk_ciphertext) enc_info['encrypted_redeem_script'] = aes_encrypt( multisig_info['redeem_script'], hex_password ) # preserve any other fields for k, v in multisig_info.items(): if k not in ['private_keys', 'redeem_script']: enc_info[k] = v return enc_info def decrypt_multisig_info(enc_multisig_info, password): """ Given an encrypted multisig info dict, decrypt the sensitive fields. Returns {'private_keys': ..., 'redeem_script': ..., **other_fields} Return {'error': ...} on error """ multisig_info = { 'private_keys': None, 'redeem_script': None, } hex_password = hexlify(password) assert is_encrypted_multisig(enc_multisig_info), 'Invalid encrypted multisig keys' multisig_info['private_keys'] = [] for enc_pk in enc_multisig_info['encrypted_private_keys']: pk = None try: pk = aes_decrypt(enc_pk, hex_password) virtualchain.BitcoinPrivateKey(pk) except Exception as e: if BLOCKSTACK_TEST: log.exception(e) return {'error': 'Invalid password; failed to decrypt private key in multisig wallet'} multisig_info['private_keys'].append(ECPrivateKey(pk).to_hex()) redeem_script = None enc_redeem_script = enc_multisig_info['encrypted_redeem_script'] try: redeem_script = aes_decrypt(enc_redeem_script, hex_password) except Exception as e: if BLOCKSTACK_TEST: log.exception(e) return {'error': 'Invalid password; failed to decrypt redeem script in multisig wallet'} multisig_info['redeem_script'] = redeem_script # preserve any other information in the multisig info for k, v in enc_multisig_info.items(): if k not in ['encrypted_private_keys', 'encrypted_redeem_script']: multisig_info[k] = v return multisig_info def encrypt_private_key_info(privkey_info, password): """ Encrypt private key info. Return {'status': True, 'encrypted_private_key_info': {'address': ..., 'private_key_info': ...}} on success Returns {'error': ...} on error """ hex_password = hexlify(password) ret = {} if is_multisig(privkey_info): ret['address'] = virtualchain.make_multisig_address( privkey_info['redeem_script'] ) ret['private_key_info'] = encrypt_multisig_info( privkey_info, password ) return {'status': True, 'encrypted_private_key_info': ret} if is_singlesig(privkey_info): ret['address'] = virtualchain.BitcoinPrivateKey( privkey_info).public_key().address() ret['private_key_info'] = aes_encrypt(privkey_info, hex_password) return {'status': True, 'encrypted_private_key_info': ret} return {'error': 'Invalid private key info'} def decrypt_private_key_info(privkey_info, password): """ Decrypt a particular private key info bundle. It can be either a single-signature private key, or a multisig key bundle. Return {'address': ..., 'private_key_info': ...} on success. Return {'error': ...} on error. """ hex_password = hexlify(password) ret = {} if is_encrypted_multisig(privkey_info): ret = decrypt_multisig_info(privkey_info, password) if 'error' in ret: return {'error': 'Failed to decrypt multisig wallet: {}'.format(ret['error'])} # sanity check if 'redeem_script' not in ret: return {'error': 'Invalid multisig wallet: missing redeem_script'} if 'private_keys' not in ret: return {'error': 'Invalid multisig wallet: missing private_keys'} return {'address': virtualchain.make_p2sh_address(ret['redeem_script']), 'private_key_info': ret} if is_encrypted_singlesig(privkey_info): try: pk = aes_decrypt(privkey_info, hex_password) pk = ECPrivateKey(pk).to_hex() except Exception as e: if BLOCKSTACK_TEST: log.exception(e) return {'error': 'Invalid password'} return {'address': virtualchain.BitcoinPrivateKey(pk).public_key().address(), 'private_key_info': pk} return {'error': 'Invalid encrypted private key info'} def make_wallet_keys(data_privkey=None, owner_privkey=None, payment_privkey=None): """ For testing. DO NOT USE """ ret = { 'owner_privkey': None, 'data_privkey': None, 'payment_privkey': None, } if data_privkey is not None: if not is_singlesig(data_privkey): raise ValueError('Invalid data key info') pk_data = virtualchain.BitcoinPrivateKey(data_privkey).to_hex() ret['data_privkey'] = pk_data if owner_privkey is not None: if is_multisig(owner_privkey): pks = [virtualchain.BitcoinPrivateKey(pk).to_hex() for pk in owner_privkey['private_keys']] m, pubs = virtualchain.parse_multisig_redeemscript(owner_privkey['redeem_script']) ret['owner_privkey'] = virtualchain.make_multisig_info(m, pks) elif is_singlesig(owner_privkey): pk_owner = virtualchain.BitcoinPrivateKey(owner_privkey).to_hex() ret['owner_privkey'] = pk_owner else: raise ValueError('Invalid owner key info') if payment_privkey is None: return ret if is_multisig(payment_privkey): pks = [virtualchain.BitcoinPrivateKey(pk).to_hex() for pk in payment_privkey['private_keys']] m, pubs = virtualchain.parse_multisig_redeemscript(payment_privkey['redeem_script']) ret['payment_privkey'] = virtualchain.make_multisig_info(m, pks) elif is_singlesig(payment_privkey): pk_payment = virtualchain.BitcoinPrivateKey(payment_privkey).to_hex() ret['payment_privkey'] = pk_payment else: raise ValueError('Invalid payment key info') return ret def get_data_privkey(user_zonefile, wallet_keys=None, config_path=CONFIG_PATH): """ Get the user's data private key. Use the private key that corresponds to the data public key in their zonefile. (If the have a designated data public key, use the data private key. If they don't, use the owner private key). Return None if not set """ from .wallet import get_wallet from .user import user_zonefile_data_pubkey try: data_pubkey = user_zonefile_data_pubkey(user_zonefile) except ValueError: log.error('Multiple pubkeys defined') return if data_pubkey is None: log.error('No data public key defined') return wallet_keys = {} if wallet_keys is None else wallet_keys if wallet_keys.get('data_privkey', None) is None: log.error('No data private key set') return wallet = get_wallet(config_path=CONFIG_PATH) if wallet_keys is None else wallet_keys assert wallet, 'Failed to get wallet' data_privkey = wallet.get('data_privkey', None) is_matching_keys = ECPrivateKey(data_privkey).public_key().to_hex() == data_pubkey if data_privkey is None or not is_matching_keys: # data private key doesn't match zonefile log.error('Data private key does not match zonefile') return # zonefile matches data privkey return ECPrivateKey(data_privkey).to_hex() def get_data_or_owner_privkey(user_zonefile, owner_address, wallet_keys=None, config_path=CONFIG_PATH): """ Get the data private key if it is set in the zonefile, or if not, fall back to the owner private key. Useful for signing mutable data when no explicit data key is set. Returns {'status': True, 'privatekey': ...} on success Returns {'error': ...} on error Raise on invalid data """ # generate the mutable zonefile data_privkey = get_data_privkey(user_zonefile, wallet_keys=wallet_keys, config_path=config_path) if data_privkey is not None: return {'status': True, 'privatekey': data_privkey} # This is legacy code here. The only time this should happen is # when the user has a single owner key, and does not have a # separate data key. log.warn('No data private key set. Falling back to owner keypair.') owner_privkey_info = get_owner_privkey_info(wallet_keys=wallet_keys, config_path=config_path) if owner_privkey_info is None: raise Exception('No owner private key info') # sanity check: must be a single private key. # if it isn't, then use the *first* private key in the multisig bundle. if not is_singlesig(owner_privkey_info): if is_multisig(owner_privkey_info): owner_privkey_info = owner_privkey_info['private_keys'][0] else: raise Exception('Invalid owner private key info') # sanity check: must match profile address owner_pubkey = virtualchain.BitcoinPrivateKey(owner_privkey_info).public_key().to_hex() compressed_addr, uncompressed_addr = get_pubkey_addresses(owner_pubkey) if owner_address not in [compressed_addr, uncompressed_addr]: raise Exception('{} not in [{},{}]'.format(owner_address, compressed_addr, uncompressed_addr)) data_privkey = virtualchain.BitcoinPrivateKey(owner_privkey_info).to_hex() return {'status': True, 'privatekey': data_privkey} def get_data_privkey_info(user_zonefile, wallet_keys=None, config_path=CONFIG_PATH): """ Get the user's data private key info """ privkey = get_data_privkey(user_zonefile, wallet_keys=wallet_keys, config_path=config_path) return privkey def get_owner_privkey_info(wallet_keys=None, config_path=CONFIG_PATH): """ Get the user's owner private key info """ from .wallet import get_wallet wallet = get_wallet(config_path=CONFIG_PATH) if wallet_keys is None else wallet_keys assert wallet is not None, 'Failed to get wallet' owner_privkey_info = wallet.get('owner_privkey', None) assert owner_privkey_info is not None, 'No owner private key set' return owner_privkey_info def get_payment_privkey_info(wallet_keys=None, config_path=CONFIG_PATH): """ Get the user's payment private key info """ from .wallet import get_wallet wallet = get_wallet(config_path=CONFIG_PATH) if wallet_keys is None else wallet_keys assert wallet is not None, 'Failed to get wallet' payment_privkey_info = wallet.get('payment_privkey', None) assert payment_privkey_info is not None, 'No payment private key set' return payment_privkey_info def get_privkey_info_address(privkey_info): """ Get the address of private key information: * if it's a single private key, then calculate the address. * if it's a multisig info dict, then get the p2sh address """ if privkey_info is None: return if is_singlesig(privkey_info): return virtualchain.BitcoinPrivateKey(privkey_info).public_key().address() if is_multisig(privkey_info): return virtualchain.make_multisig_address(privkey_info['redeem_script']) raise ValueError('Invalid private key info') def get_privkey_info_params(privkey_info, config_path=CONFIG_PATH): """ Get the parameters that characterize a private key info bundle: the number of private keys, and the number of signatures required to make a valid transaction. * for single private keys, this is (1, 1) * for multisig info dicts, this is (m, n) Return (m, n) on success Return (None, None) on failure """ if privkey_info is None: from .backend.blockchain import get_block_height key_config = (2, 3) log.warning('No private key info given, assuming {} key config'.format(key_config)) return key_config if is_singlesig( privkey_info ): return (1, 1) elif is_multisig( privkey_info ): m, pubs = virtualchain.parse_multisig_redeemscript(privkey_info['redeem_script']) if m is None or pubs is None: return None, None return m, len(pubs) return None, None def get_pubkey_addresses(pubkey): """ Get the compressed and uncompressed addresses for a public key. Useful for verifying signatures by key address. If we're running in testnet mode, then use the testnet version byte. Return (compressed address, uncompressed address) """ version_byte = virtualchain.version_byte compressed_address, uncompressed_address = None, None pubkey = ECPublicKey(pubkey, version_byte=version_byte) pubkey_bin = pubkey.to_bin() if pubkey._type == PubkeyType.compressed: compressed_address = pubkey.address() uncompressed_address = decompress(pubkey_bin) hashed_address = bin_hash160(uncompressed_address) uncompressed_address = bin_hash160_to_address(hashed_address, version_byte=version_byte) elif pubkey._type == PubkeyType.uncompressed: uncompressed_address = pubkey.address() compressed_address = compress(pubkey_bin) hashed_address = bin_hash160(compressed_address) compressed_address = bin_hash160_to_address(hashed_address, version_byte=version_byte) else: raise Exception('Invalid public key') return compressed_address, uncompressed_address def get_pubkey_hex( privatekey_hex ): """ Get the uncompressed hex form of a private key """ if len(privatekey_hex) > 64: assert privatekey_hex[-2:] == '01' privatekey_hex = privatekey_hex[:64] # get hex public key privatekey_int = int(privatekey_hex, 16) pubkey_parts = fastecdsa.keys.get_public_key( privatekey_int, curve=fastecdsa.curve.secp256k1 ) pubkey_hex = "04{:064x}{:064x}".format(pubkey_parts[0], pubkey_parts[1]) return pubkey_hex def get_uncompressed_private_and_public_keys( privkey_str ): """ Get the private and public keys from a private key string. Make sure the both are *uncompressed* """ pk = virtualchain.BitcoinPrivateKey(str(privkey_str)) pk_hex = pk.to_hex() # force uncompressed if len(pk_hex) > 64: assert pk_hex[-2:] == '01' pk_hex = pk_hex[:64] pubk_hex = virtualchain.BitcoinPrivateKey(pk_hex).public_key().to_hex() return pk_hex, pubk_hex