refactor legacy wallet compatibility code into version-specifc methods

This commit is contained in:
Jude Nelson
2017-03-21 11:39:40 -04:00
parent 9cca1e9e80
commit 1955785e24

View File

@@ -26,8 +26,10 @@ from __future__ import print_function
import time
import json
import os
import sys
import shutil
import virtualchain
import copy
from keylib import ECPrivateKey
from socket import error as socket_error
@@ -47,10 +49,9 @@ logging.disable(logging.CRITICAL)
import requests
requests.packages.urllib3.disable_warnings()
from .backend.crypto.utils import get_address_from_privkey
from .backend.crypto.utils import aes_decrypt
from .backend.blockchain import get_balance, get_block_height
from .utils import satoshis_to_btc, print_result
from .backend.crypto.utils import aes_decrypt, aes_encrypt
from .backend.blockchain import get_balance
from .utils import print_result
from .keys import *
@@ -58,7 +59,6 @@ import config
from .constants import (
WALLET_PATH, WALLET_PASSWORD_LENGTH, CONFIG_PATH,
CONFIG_DIR, CONFIG_FILENAME, WALLET_FILENAME,
WALLET_DECRYPT_MAX_TRIES, WALLET_DECRYPT_BACKOFF_RESET,
BLOCKSTACK_DEBUG, BLOCKSTACK_TEST, SERIES_VERSION
)
@@ -68,72 +68,8 @@ from .schemas import *
log = config.get_logger()
DECRYPT_ATTEMPTS = 0
LAST_DECRYPT_ATTEMPT = 0
def _make_encrypted_wallet_data(password, payment_privkey_info, owner_privkey_info, data_privkey_info, test_legacy=False):
"""
Lowlevel method to make the encrypted wallet's data,
given the decrypted data.
"""
data = {}
enc_payment_info = None
enc_owner_info = None
enc_data_info = None
if not test_legacy:
# legacy wallets (which we test for) may omit these.
# when running in production, this is prohibited.
assert payment_privkey_info
assert owner_privkey_info
assert data_privkey_info
if payment_privkey_info is not None:
enc_payment_info = encrypt_private_key_info(payment_privkey_info, password)
if 'error' in enc_payment_info:
log.error('failed to encrypt payment private key info')
return {'error': enc_payment_info['error']}
if owner_privkey_info is not None:
enc_owner_info = encrypt_private_key_info(owner_privkey_info, password)
if 'error' in enc_owner_info:
log.error('failed to encrypt owner private key info')
return {'error': enc_owner_info['error']}
if data_privkey_info is not None:
enc_data_info = encrypt_private_key_info(data_privkey_info, password)
if 'error' in enc_data_info:
log.error('failed to encrypt data private key info')
return {'error': enc_data_info['error']}
if enc_payment_info is not None:
payment_addr = enc_payment_info['encrypted_private_key_info']['address']
enc_payment_info = enc_payment_info['encrypted_private_key_info']['private_key_info']
data['encrypted_payment_privkey'] = enc_payment_info
data['payment_addresses'] = [payment_addr]
if enc_owner_info is not None:
owner_addr = enc_owner_info['encrypted_private_key_info']['address']
enc_owner_info = enc_owner_info['encrypted_private_key_info']['private_key_info']
data['encrypted_owner_privkey'] = enc_owner_info
data['owner_addresses'] = [owner_addr]
if enc_data_info is not None:
enc_data_info = enc_data_info['encrypted_private_key_info']['private_key_info']
data['encrypted_data_privkey'] = enc_data_info
data['data_pubkeys'] = [ECPrivateKey(data_privkey_info).public_key().to_hex()]
data['data_pubkey'] = data['data_pubkeys'][0]
data['version'] = SERIES_VERSION
return data
def encrypt_wallet(wallet, password, test_legacy=False):
def encrypt_wallet(decrypted_wallet, password, test_legacy=False):
"""
Encrypt the wallet.
Return the encrypted dict on success
@@ -145,29 +81,56 @@ def encrypt_wallet(wallet, password, test_legacy=False):
# must be conformant to the current schema
if not test_legacy:
jsonschema.validate(wallet, WALLET_SCHEMA_CURRENT)
jsonschema.validate(decrypted_wallet, WALLET_SCHEMA_CURRENT)
payment_privkey_info = wallet.get('payment_privkey', None)
owner_privkey_info = wallet.get('owner_privkey', None)
data_privkey_info = wallet.get('data_privkey', None)
wallet = {
'owner_addresses': decrypted_wallet['owner_addresses'],
'payment_addresses': decrypted_wallet['payment_addresses'],
'data_pubkey': decrypted_wallet['data_pubkey'],
'data_pubkeys': decrypted_wallet['data_pubkeys'],
'version': decrypted_wallet['version'],
'enc': None, # to be filled in
}
# make sure data key is hex encoded
data_privkey_info = decrypted_wallet.get('data_privkey', None)
if not test_legacy:
assert data_privkey_info
if not is_singlesig(data_privkey_info):
log.error('Invalid data private key')
return {'error': 'Invalid data private key'}
if data_privkey_info:
if not is_singlesig_hex(data_privkey_info):
data_privkey_info = ECPrivateKey(data_privkey_info).to_hex()
if not is_singlesig_hex(data_privkey_info):
data_privkey_info = ECPrivateKey(data_privkey_info).to_hex()
if not is_singlesig(data_privkey_info):
log.error('Invalid data private key')
return {'error': 'Invalid data private key'}
encrypted_wallet = _make_encrypted_wallet_data(password, payment_privkey_info, owner_privkey_info, data_privkey_info, test_legacy=test_legacy)
wallet_enc = {
'owner_privkey': decrypted_wallet['owner_privkey'],
'payment_privkey': decrypted_wallet['payment_privkey'],
'data_privkey': data_privkey_info
}
if 'error' in encrypted_wallet:
return encrypted_wallet
# extra sanity check: make sure that when re-combined with the wallet,
# we're still valid
recombined_wallet = copy.deepcopy(wallet)
recombined_wallet.update(wallet_enc)
jsonschema.validate(recombined_wallet, WALLET_SCHEMA_CURRENT)
# good to go!
# encrypt secrets
wallet_secret_str = json.dumps(wallet_enc, sort_keys=True)
password_hex = hexlify(password)
encrypted_secret_str = aes_encrypt(wallet_secret_str, password_hex)
# fulfill wallet
wallet['enc'] = encrypted_secret_str
# sanity check
if not test_legacy:
jsonschema.validate(encrypted_wallet, ENCRYPTED_WALLET_SCHEMA_CURRENT)
jsonschema.validate(wallet, ENCRYPTED_WALLET_SCHEMA_CURRENT)
return encrypted_wallet
return wallet
def make_wallet(password, config_path=CONFIG_PATH, payment_privkey_info=None, owner_privkey_info=None, data_privkey_info=None, test_legacy=False, encrypt=True):
@@ -188,104 +151,36 @@ def make_wallet(password, config_path=CONFIG_PATH, payment_privkey_info=None, ow
owner_privkey_info = virtualchain.make_multisig_wallet(2, 3) if owner_privkey_info is None and not test_legacy else owner_privkey_info
data_privkey_info = ECPrivateKey().to_wif() if data_privkey_info is None and not test_legacy else data_privkey_info
new_wallet = _make_encrypted_wallet_data(password, payment_privkey_info, owner_privkey_info, data_privkey_info, test_legacy=test_legacy)
if 'error' in new_wallet:
return new_wallet
# sanity check
decrypted_wallet = {
'owner_addresses': [get_privkey_info_address(owner_privkey_info)],
'owner_privkey': owner_privkey_info,
'payment_addresses': [get_privkey_info_address(payment_privkey_info)],
'payment_privkey': payment_privkey_info,
'data_pubkey': ECPrivateKey(data_privkey_info).public_key().to_hex(),
'data_pubkeys': [ECPrivateKey(data_privkey_info).public_key().to_hex()],
'data_privkey': data_privkey_info,
'version': SERIES_VERSION,
}
if not test_legacy:
jsonschema.validate(new_wallet, ENCRYPTED_WALLET_SCHEMA_CURRENT)
jsonschema.validate(decrypted_wallet, WALLET_SCHEMA_CURRENT)
if encrypt:
return new_wallet
encrypted_wallet = encrypt_wallet(decrypted_wallet, password, test_legacy=test_legacy)
if 'error' in encrypted_wallet:
return encrypted_wallet
# decrypt and return
wallet_info = decrypt_wallet(new_wallet, password, config_path=config_path)
assert 'error' not in wallet_info, "Failed to decrypt new wallet: {}".format(wallet_info['error'])
# sanity check
jsonschema.validate(encrypted_wallet, ENCRYPTED_WALLET_SCHEMA_CURRENT)
return encrypted_wallet
return wallet_info['wallet']
else:
return decrypted_wallet
def log_failed_decrypt(max_tries=WALLET_DECRYPT_MAX_TRIES):
def make_legacy_wallet_keys(data, password):
"""
Record that we tried (and failed)
to decrypt a wallet. Determine
how long we should wait before
allowing another attempt.
If we tried many times, then use
exponential backoff to limit brute-forces
Return the interval of time to sleep
"""
global DECRYPT_ATTEMPTS
global LAST_DECRYPT_ATTEMPT
global NEXT_DECRYPT_ATTEMPT
if LAST_DECRYPT_ATTEMPT + WALLET_DECRYPT_BACKOFF_RESET < time.time():
# haven't tried in a while
DECRYPT_ATTEMPTS = 0
NEXT_DECRYPT_ATTEMPT = 0
return
DECRYPT_ATTEMPTS += 1
LAST_DECRYPT_ATTEMPT = time.time()
if DECRYPT_ATTEMPTS > max_tries:
interval = 2 ** (DECRYPT_ATTEMPTS - max_tries + 1)
NEXT_DECRYPT_ATTEMPT = time.time() + interval
return
def can_attempt_decrypt(max_tries=WALLET_DECRYPT_MAX_TRIES):
"""
Can we attempt a decryption?
Has enough time passed since the last guess?
"""
global DECRYPT_ATTEMPTS
global LAST_DECRYPT_ATTEMPT
global NEXT_DECRYPT_ATTEMPT
if LAST_DECRYPT_ATTEMPT + WALLET_DECRYPT_BACKOFF_RESET < time.time():
# haven't tried in a while
DECRYPT_ATTEMPTS = 0
NEXT_DECRYPT_ATTEMPT = 0
return True
return NEXT_DECRYPT_ATTEMPT < time.time()
def time_until_next_decrypt_attempt():
"""
When can we try to decrypt next?
"""
global NEXT_DECRYPT_ATTEMPT
if NEXT_DECRYPT_ATTEMPT == 0:
return 0
return max(0, NEXT_DECRYPT_ATTEMPT - time.time())
def decrypt_error(max_tries):
"""
Generate an appropriate error response, based on
why we failed to decrypt data
"""
ret = {'error': 'Incorrect password'}
log_failed_decrypt(max_tries=max_tries)
if not can_attempt_decrypt(max_tries=max_tries):
log.debug('Incorrect password; using exponential backoff')
msg = 'Incorrect password. Try again in {} seconds'
ret['error'] = msg.format(time_until_next_decrypt_attempt())
return ret
def make_legacy_wallet_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRIES):
"""
Given a legacy wallet with a "master private key",
Given a legacy wallet with a "master private key" (i.e. pre-0.13),
generate the owner, payment, and data key values
Return {'payment': priv, 'owner': priv, 'data': priv} on success
Return {'error': ...} on error
@@ -299,7 +194,6 @@ def make_legacy_wallet_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRIES):
if BLOCKSTACK_DEBUG is not None:
log.exception(e)
ret = decrypt_error(max_tries)
log.debug('Failed to decrypt encrypted_master_private_key: {}'.format(ret['error']))
return ret
@@ -322,7 +216,7 @@ def make_legacy_wallet_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRIES):
return key_defaults
def make_legacy_wallet_013_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRIES):
def make_legacy_wallet_013_keys(data, password):
"""
Given a legacy 0.13 wallet with "owner private key" and "payment private key"
defined, generate the owner, payment, and data values.
@@ -347,7 +241,7 @@ def make_legacy_wallet_013_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRI
owner_privkey = owner_privkey.pop('private_key_info')
if err:
ret = decrypt_error(max_tries)
ret = {'error': "Failed to decrypt owner and payment keys"}
log.debug("Failed to decrypt owner or payment keys: {}".format(err))
return ret
@@ -366,76 +260,15 @@ def make_legacy_wallet_013_keys(data, password, max_tries=WALLET_DECRYPT_MAX_TRI
return key_defaults
def decrypt_wallet(data, password, config_path=CONFIG_PATH,
max_tries=WALLET_DECRYPT_MAX_TRIES):
def decrypt_wallet_legacy(data, key_defaults, password):
"""
Decrypt a wallet's encrypted fields. The wallet will be migrated to the current schema.
Decrypt 0.14.1 and earlier wallets, given the wallet data, the default key values,
and the password.
After WALLET_DECRYPT_MAX_TRIES failed attempts, start doing exponential backoff
to prevent brute-force attacks.
Migrate the wallet from a legacy format to the latest format, if needed.
* By default, generate a new data key if there is no data key set (unless owner_key_is_data_key=True,
in which case, the first owner private key will be set)
Return {'status': True, 'migrated': True|False, 'wallet': wallet} on success.
Return {'error': ...} on failure
Return {'status': True, 'wallet': wallet} on success
Raise on error
"""
legacy = False
legacy_013 = False
is_legacy = False
migrated = False
# must match either current schema or legacy schema
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_CURRENT)
except ValidationError as ve:
# maybe legacy?
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_LEGACY)
legacy = True
except ValidationError, ve2:
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_LEGACY_013)
legacy_013 = True
except ValidationError, ve3:
if not BLOCKSTACK_TEST:
# if in production, this is fatal
log.exception(ve2)
log.error('Invalid wallet data')
return {'error': 'Invalid wallet data'}
is_legacy = (legacy or legacy_013)
legacy_hdwallet = None
key_defaults = {}
new_wallet = {}
ret = {}
if not can_attempt_decrypt(max_tries=max_tries):
msg = 'Cannot decrypt at this time. Try again in {} seconds'
return {'error': msg.format(time_until_next_decrypt_attempt())}
if legacy:
# legacy wallets use a hierarchical deterministic private key for owner, payment, and data keys.
# get that key first, if needed.
key_defaults = make_legacy_wallet_keys(data, password, max_tries=max_tries)
if 'error' in key_defaults:
log.error("Failed to migrate legacy wallet: {}".format(key_defaults['error']))
return key_defaults
migrated = True
elif legacy_013:
# legacy 0.13 wallets have an owner_privkey and a payment_privkey, but not a data_privkey
key_defaults = make_legacy_wallet_013_keys(data, password, max_tries=max_tries)
if 'error' in key_defaults:
log.error("Failed to migrate legacy 0.13 wallet: {}".format(key_defaults['error']))
return key_defaults
migrated = True
# NOTE: 'owner' must come before 'data', since we may use it to generate the data key
keynames = ['payment', 'owner', 'data']
@@ -453,8 +286,8 @@ def decrypt_wallet(data, password, config_path=CONFIG_PATH,
field = decrypt_private_key_info(data[encrypted_keyname], password)
if 'error' in field:
ret = decrypt_error(max_tries)
log.debug('Failed to decrypt {}: {}'.format(encrypted_keyname, ret['error']))
ret = {'error': "Failed to decrypt {}: {}".format(encrypted_keyname, field['error'])}
log.debug('Failed to decrypt {}: {}'.format(encrypted_keyname, field['error']))
return ret
new_wallet[keyname_privkey] = field['private_key_info']
@@ -464,7 +297,6 @@ def decrypt_wallet(data, password, config_path=CONFIG_PATH,
# Legacy migration: this key is not defined in the wallet
# use the appopriate default key
assert is_legacy
assert keyname in key_defaults, 'BUG: no legacy private key for {}'.format(keyname)
default_privkey = key_defaults[keyname]
@@ -473,10 +305,142 @@ def decrypt_wallet(data, password, config_path=CONFIG_PATH,
virtualchain.BitcoinPrivateKey(default_privkey).public_key().address()
]
migrated = True
return {'status': True, 'wallet': new_wallet}
# add data keys. Make sure it's *uncompressed*
def decrypt_wallet_current(data, password):
"""
Given a JSON blob that represents a known-current wallet format,
decrypt it.
Return {'status': True, 'wallet': wallet} on success
Return {'error': ...} on error
"""
hex_password = hexlify(password)
payload = aes_decrypt(data['enc'], hex_password)
wallet_secrets = None
if payload is None:
return {'error': 'Failed to decrypt encrypted wallet portions'}
try:
wallet_secrets = json.loads(payload)
except ValueError:
return {'error': 'Failed to deserialize wallet secrets'}
# should be mergeable into the wallet's public components
new_wallet = copy.deepcopy(data)
del new_wallet['enc']
new_wallet.update(wallet_secrets)
try:
jsonschema.validate(new_wallet, WALLET_SCHEMA_CURRENT)
except ValidationError, ve:
if BLOCKSTACK_DEBUG:
log.exception(ve)
return {'error': 'Wallet secrets do not match wallet schema'}
return {'status': True, 'wallet': new_wallet}
def decrypt_wallet(data, password, config_path=CONFIG_PATH):
"""
Decrypt a wallet's encrypted fields. The wallet will be migrated to the current schema.
Migrate the wallet from a legacy format to the latest format, if needed.
Return {'status': True, 'migrated': True|False, 'wallet': wallet} on success.
Return {'error': ...} on failure
"""
legacy = False
legacy_013 = False
legacy_014 = False
is_legacy = False
migrated = False
# must match either current schema or legacy schema
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_CURRENT)
except ValidationError as ve:
# maybe legacy?
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_LEGACY)
legacy = True
except ValidationError, ve2:
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_LEGACY_013)
legacy_013 = True
except ValidationError, ve3:
try:
jsonschema.validate(data, ENCRYPTED_WALLET_SCHEMA_LEGACY_014)
legacy_014 = True
except ValidationError, ve4:
if BLOCKSTACK_TEST:
log.exception(ve2)
log.exception(ve3)
log.exception(ve4)
log.error('Invalid wallet data')
return {'error': 'Invalid wallet data'}
any_legacy = (legacy or legacy_013 or legacy_014)
legacy_hdwallet = None
key_defaults = {}
new_wallet = {}
ret = {}
# version check
# if the version has changed, we'll need to potentially migrate
# to e.g. trigger a re-encryption
if not data.has_key('version'):
log.debug("Wallet has no version; triggering migration")
migrated = True
elif data['version'] != SERIES_VERSION:
log.debug("Wallet series has changed from {} to {}; triggerring migration".format(data['version'], SERIES_VERSION))
migrated = True
# legacy check
if legacy:
# legacy wallets use a hierarchical deterministic private key for owner, payment, and data keys.
# get that key first, if needed.
key_defaults = make_legacy_wallet_keys(data, password)
if 'error' in key_defaults:
log.error("Failed to migrate legacy wallet: {}".format(key_defaults['error']))
return key_defaults
migrated = True
elif legacy_013:
# legacy 0.13 wallets have an owner_privkey and a payment_privkey, but not a data_privkey
key_defaults = make_legacy_wallet_013_keys(data, password)
if 'error' in key_defaults:
log.error("Failed to migrate legacy 0.13 wallet: {}".format(key_defaults['error']))
return key_defaults
migrated = True
elif legacy_014:
# legacy 0.14 wallets have encrypted owner, data, and payment private keys (they're all separate)
migrated = True
if any_legacy:
wallet_info = decrypt_wallet_legacy(data, key_defaults, password)
else:
wallet_info = decrypt_wallet_current(data, password)
if 'error' in wallet_info:
log.error("Failed to decrypt wallet; {}".format(wallet_info['error']))
return {'error': 'Failed to decrypt wallet'}
new_wallet = wallet_info['wallet']
# post-decryption formatting
# make sure data key is an uncompressed public key
assert new_wallet.has_key('data_privkey')
data_pubkey = ECPrivateKey(str(new_wallet['data_privkey'])).public_key().to_hex()
if keylib.key_formatting.get_pubkey_format(data_pubkey) == 'hex_compressed':
@@ -486,7 +450,8 @@ def decrypt_wallet(data, password, config_path=CONFIG_PATH,
new_wallet['data_pubkeys'] = [data_pubkey]
new_wallet['data_pubkey'] = data_pubkey
# pass along version
new_wallet['version'] = SERIES_VERSION
# sanity check--must be decrypted properly
@@ -710,12 +675,10 @@ def backup_wallet(wallet_path):
return legacy_path
def migrate_wallet(password=None, config_path=CONFIG_PATH, dry_run=False):
def migrate_wallet(password=None, config_path=CONFIG_PATH):
"""
Migrate the wallet to the latest format.
Back up the old wallet.
Optionally do a dry-run to see what would happen.
Optionally use the owner key as the data key
Return {'status': True, 'backup_wallet': ..., 'wallet': ..., 'wallet_password': ..., 'migrated': True} on success
Return {'status': True, 'wallet': ..., 'wallet_password': ..., 'migrated': False} if no migration was necessary.
@@ -739,16 +702,14 @@ def migrate_wallet(password=None, config_path=CONFIG_PATH, dry_run=False):
return encrypted_wallet
# back up
old_path = None
if not dry_run:
old_path = backup_wallet(wallet_path)
old_path = backup_wallet(wallet_path)
# store
res = write_wallet(encrypted_wallet, path=wallet_path)
if not res:
# try to restore
shutil.copy(old_path, wallet_path)
return {'error': 'Failed to store migrated wallet.'}
# store
res = write_wallet(encrypted_wallet, path=wallet_path)
if not res:
# try to restore
shutil.copy(old_path, wallet_path)
return {'error': 'Failed to store migrated wallet.'}
return {'status': True, 'migrated': True, 'backup_wallet': old_path, 'wallet': wallet, 'wallet_password': password}
@@ -1039,15 +1000,15 @@ def get_total_balance(config_path=CONFIG_PATH, wallet_path=WALLET_PATH):
return total_balance, payment_addresses
def wallet_setup(config_path=CONFIG_PATH, interactive=True, wallet_data=None, wallet_path=None, password=None, dry_run=False, test_legacy=False):
def wallet_setup(config_path=CONFIG_PATH, interactive=True, wallet_data=None, wallet_path=None, password=None, test_legacy=False):
"""
Do one-time wallet setup.
* make sure the wallet exists (creating it if need be)
* migrate the wallet if it is in legacy format
Return {'status': True, 'created': False, 'migrated': False, 'password': ..., 'wallet'; ... } on success
Return {'status': True, 'created'; True, 'migrated': False, 'password': ..., 'wallet': ...} if we had to create the wallet
Return {'status': True, 'created': False, 'migrated': True, 'password': ..., 'wallet': ...} if we had to migrate the wallet
Return {'status': True, 'created': False, 'migrated': False, 'password': ..., 'wallet'; ...} on success
Return {'status': True, 'created'; True, 'migrated': False, 'password': ..., 'wallet': ...} if we had to create the wallet
Return {'status': True, 'created': False, 'migrated': True, 'password': ..., 'wallet': ...} if we had to migrate the wallet
Optionally also include 'backup_wallet': ... if the wallet was migrated
"""
@@ -1091,7 +1052,7 @@ def wallet_setup(config_path=CONFIG_PATH, interactive=True, wallet_data=None, wa
if not created:
# try to migrate
res = migrate_wallet(password=password, config_path=config_path, dry_run=dry_run)
res = migrate_wallet(password=password, config_path=config_path)
if 'error' in res:
return res