remove dead code; have registrar thread track pending transfers; do transfers after updates if requested

This commit is contained in:
Jude Nelson
2017-02-10 17:38:06 -05:00
parent dfd2aba3e7
commit c842082a55

View File

@@ -39,6 +39,7 @@ import socket
import threading
import time
import tempfile
import hashlib
import keylib
from keylib import ECPrivateKey
@@ -53,9 +54,8 @@ from .blockchain import get_block_height
from ..keys import get_data_privkey_info, is_singlesig, is_singlesig_hex, is_multisig, get_privkey_info_address, get_privkey_info_params, encrypt_private_key_info, decrypt_private_key_info
from ..proxy import is_name_registered, is_zonefile_hash_current, is_name_owner, get_default_proxy, get_name_blockchain_record, get_name_cost, get_atlas_peers, json_is_error
from ..profile import get_and_migrate_profile
from ..zonefile import zonefile_data_replicate, make_empty_zonefile
from ..user import is_user_zonefile
from ..user import is_user_zonefile, make_empty_user_profile
from ..storage import put_mutable_data, put_immutable_data, hash_zonefile, get_zonefile_data_hash
from ..data import get_profile_timestamp, set_profile_timestamp
@@ -180,7 +180,9 @@ class RegistrarWorker(threading.Thread):
owner_privkey_params = get_privkey_info_params( owner_privkey_info )
log.debug('Send async register for {}'.format(name_data['fqu']))
res = async_register( name_data['fqu'], payment_privkey_info, owner_address, owner_privkey_params=owner_privkey_params, proxy=proxy, config_path=config_path, queue_path=queue_path )
log.debug("async_register({}, zonefile={}, profile={}, transfer_address={})".format(name_data['fqu'], name_data.get('zonefile'), name_data.get('profile'), name_data.get('transfer_address')))
res = async_register( name_data['fqu'], payment_privkey_info, owner_address, name_data=name_data,
owner_privkey_params=owner_privkey_params, proxy=proxy, config_path=config_path, queue_path=queue_path )
return res
else:
# already queued
@@ -201,10 +203,11 @@ class RegistrarWorker(threading.Thread):
@classmethod
def init_profile( cls, name_data, proxy=None, config_path=CONFIG_PATH, queue_path=DEFAULT_QUEUE_PATH ):
def set_zonefile( cls, name_data, proxy=None, config_path=CONFIG_PATH, queue_path=DEFAULT_QUEUE_PATH ):
"""
Given a newly-registered name, go broadcast the hash of its empty zonefile.
Idempotent--if the name is already migrated, then return the result of the pending transaction
Given a newly-registered name, go broadcast the hash of its zonefile.
Idempotent--if the name is already migrated, then return the result of the pending transaction.
Return {'status': True, 'transaction_hash': ..., 'zonefile_hash': ...} on success
Return {'error': ...} on error
"""
@@ -223,24 +226,28 @@ class RegistrarWorker(threading.Thread):
else:
raise Exception("Queue inconsistency: name '%s' is and is not pending update" % up_result['fqu'])
res = migrate( conf['rpc_token'], name_data['fqu'], config_path=config_path, proxy=proxy )
log.debug("update({}, zonefile={}, profile={}, transfer_address={})".format(name_data['fqu'], name_data.get('zonefile'), name_data.get('profile'), name_data.get('transfer_address')))
res = update( conf['rpc_token'], name_data['fqu'], base64.b64encode(name_data.get('zonefile')), name_data.get('profile'),
name_data.get('zonefile_hash'), name_data.get('transfer_address'), config_path=config_path, proxy=proxy )
assert 'success' in res
if not res['success']:
log.error("migrate %s: %s" % (name_data['fqu'], res['error']))
return {'error': res['error']}
else:
try:
assert 'transaction_hash' in res
assert 'zonefile_hash' in res
assert 'value_hash' in res
except:
raise Exception("Invalid response\n%s\n" % json.dumps(res, indent=4, sort_keys=True))
return {'status': True, 'transaction_hash': res['transaction_hash'], 'zonefile_hash': res['zonefile_hash']}
return {'status': True, 'transaction_hash': res['transaction_hash'], 'zonefile_hash': res['value_hash']}
@classmethod
def init_profiles( cls, queue_path, config_path=CONFIG_PATH, proxy=None ):
def set_zonefiles( cls, queue_path, config_path=CONFIG_PATH, proxy=None ):
"""
Find all confirmed registrations, create empty zonefiles for them and broadcast their hashes to the blockchain.
Queue up the zonefiles and profiles for subsequent replication.
@@ -261,7 +268,7 @@ class RegistrarWorker(threading.Thread):
continue
log.debug("Register for '%s' (%s) is confirmed!" % (register['fqu'], register['tx_hash']))
res = cls.init_profile( register, proxy=proxy, queue_path=queue_path, config_path=config_path )
res = cls.set_zonefile( register, proxy=proxy, queue_path=queue_path, config_path=config_path )
if 'error' in res:
log.error("Failed to make name profile for %s: %s" % (register['fqu'], res['error']))
ret = {'error': 'Failed to set up name profile'}
@@ -356,11 +363,11 @@ class RegistrarWorker(threading.Thread):
@classmethod
def clear_confirmed( cls, config_path, queue_path, proxy=None ):
"""
Find all confirmed update, transfer, etc. transactions, and clear them out
Find all confirmed transactions besides preorder, register, and update, and remove them from the queue.
Return {'status': true} on success
Return {'error': ...} on failure
"""
for queue_name in ['transfer']:
for queue_name in ['transfer', 'revoke', 'renew']:
accepted = queue_find_accepted( queue_name, path=queue_path, config_path=config_path )
if len(accepted) > 0:
@@ -371,7 +378,7 @@ class RegistrarWorker(threading.Thread):
@classmethod
def replicate_profile_data( cls, name_data, atlas_servers, wallet_data, storage_drivers, config_path, proxy=None, replicated_zonefiles=[] ):
def replicate_name_data( cls, name_data, atlas_servers, wallet_data, storage_drivers, config_path, proxy=None, replicated_zonefiles=[], replicated_profile_hashes=[] ):
"""
Given an update queue entry,
replicate the zonefile to as many
@@ -429,13 +436,21 @@ class RegistrarWorker(threading.Thread):
log.warning("Not a zone file; not replicating profile for %s" % name_data['fqu'])
return {'status': True}
data_privkey = get_data_privkey_info( zonefile, wallet_keys=wallet_data, config_path=config_path )
assert data_privkey is not None and not json_is_error(data_privkey), "No data private key"
log.info("Replicate profile data for %s to %s" % (name_data['fqu'], ",".join(storage_drivers)))
profile_payload = copy.deepcopy(name_data['profile'])
profile_hash = hashlib.sha256(json.dumps(profile_payload, sort_keys=True)).hexdigest()
# did we replicate this profile already?
if profile_hash in replicated_profile_hashes:
# already replicated
log.debug("Already replicated profile for {}".format(name_data['fqu']))
return {'status': True}
profile_payload = set_profile_timestamp(profile_payload)
rc = put_mutable_data( name_data['fqu'], profile_payload, data_privkey, required=storage_drivers, profile=True, blockchain_id=name_data['fqu'] )
@@ -444,6 +459,9 @@ class RegistrarWorker(threading.Thread):
return {'error': 'Failed to store profile'}
else:
log.info("Replicated profile for %s" % (name_data['fqu']))
# don't do this again
replicated_profile_hashes.append(profile_hash)
return {'status': True}
else:
@@ -452,13 +470,19 @@ class RegistrarWorker(threading.Thread):
@classmethod
def replicate_profiles( cls, queue_path, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
def replicate_names_data( cls, queue_path, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
"""
Replicate all zonefiles for each confirmed update.
Remove successfully-replicated updates
Replicate all zonefiles and profiles for each confirmed update.
@atlas_servers should be a list of (host, port)
Do NOT remove items from the queue.
Return {'status': True} on success
Return {'error': ..., 'names': [...]} on failure. 'names' refers to the list of names that failed
"""
ret = {'status': True}
failed_names = []
updates = cls.get_confirmed_updates( config_path, queue_path )
if len(updates) == 0:
return ret
@@ -470,17 +494,65 @@ class RegistrarWorker(threading.Thread):
for update in updates:
log.debug("Zonefile update on '%s' (%s) is confirmed! New hash is %s" % (update['fqu'], update['tx_hash'], update['zonefile_hash']))
res = cls.replicate_profile_data( update, atlas_servers, wallet_data, storage_drivers, config_path, proxy=proxy )
res = cls.replicate_name_data( update, atlas_servers, wallet_data, storage_drivers, config_path, proxy=proxy )
if 'error' in res:
log.error("Failed to update %s: %s" % (update['fqu'], res['error']))
ret = {'error': 'Failed to finish an update'}
failed_names.append( update['fqu'] )
if 'error' in ret:
ret['names'] = failed_names
return ret
@classmethod
def transfer_names( cls, queue_path, skip=[], config_path=CONFIG_PATH, proxy=None ):
"""
Find all confirmed updates and if they have a transfer address, transfer them.
Otherwise, clear them from the update queue.
Return {'status': True} on success
Return {'error': ...} on failure
"""
if proxy is None:
proxy = get_default_proxy(config_path=config_path)
ret = {'status': True}
conf = get_config(config_path)
assert conf
updates = cls.get_confirmed_updates( config_path, queue_path )
if len(updates) == 0:
return ret
for update in updates:
if update['fqu'] in skip:
continue
if update.get("transfer_address") is not None:
log.debug("Transfer {} to {}".format(update['fqu'], update['transfer_address']))
res = transfer(conf['rpc_token'], update['fqu'], update['transfer_address'], config_path=config_path, proxy=proxy )
assert 'success' in res
if res['success']:
# clear from update queue
queue_removeall( [update], path=queue_path )
else:
# will try again
log.error("Failed to transfer {} to {}: {}".format(update['fqu'], update['transfer_address'], res.get('error')))
ret = {'error': 'Not all names transferred'}
else:
# clear
# nothing more to do
log.debug("Done working on {}".format(update['fqu']))
log.debug("Final name output: {}".format(update))
queue_removeall( [update], path=queue_path )
return ret
@classmethod
def get_atlas_server_list( cls, config_path ):
@@ -613,6 +685,7 @@ class RegistrarWorker(threading.Thread):
"""
failed = False
poll_interval = self.poll_interval
failed_names = []
log.info("Registrar worker entered")
@@ -677,6 +750,7 @@ class RegistrarWorker(threading.Thread):
except Exception, e:
log.exception(e)
break
poll_interval = 1.0
try:
# see if we can complete any registrations
@@ -693,12 +767,13 @@ class RegistrarWorker(threading.Thread):
except Exception, e:
log.exception(e)
failed = True
poll_interval = 1.0
try:
# see if we can put any zonefiles
# clear out any confirmed registers
log.debug("put zonefile hashes for registered names in %s" % (self.queue_path))
res = RegistrarWorker.init_profiles( self.queue_path, config_path=self.config_path, proxy=proxy )
res = RegistrarWorker.set_zonefiles( self.queue_path, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn('zonefile hash broadcast failed: %s' % res['error'])
@@ -709,22 +784,42 @@ class RegistrarWorker(threading.Thread):
except Exception, e:
log.exception(e)
failed = True
poll_interval = 1.0
try:
# see if we can replicate any zonefiles and profiles
# clear out any confirmed updates
log.debug("replicate all pending zonefiles and profiles in %s" % (self.queue_path))
res = RegistrarWorker.replicate_profiles( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
failed_names = []
res = RegistrarWorker.replicate_names_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zonefile/profile replication failed: %s" % res['error'])
# try exponential backoff
failed = True
poll_interval = 1.0
failed_names = res['names']
except Exception, e:
log.exception(e)
failed = True
poll_interval = 1.0
try:
# see if we can transfer any names to their new owners
log.debug("transfer all names in {}".format(self.queue_path))
res = RegistrarWorker.transfer_names( self.queue_path, skip=failed_names, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Transfer failed: {}".format(res['error']))
# try exponential backoff
failed = True
poll_interval = 1.0
except Exception as e:
log.exception(e)
failed = True
poll_interval = 1.0
try:
# see if we can remove any other confirmed operations, besides preorders, registers, and updates
@@ -740,6 +835,7 @@ class RegistrarWorker(threading.Thread):
except Exception, e:
log.exception(e)
failed = True
poll_interval = 1.0
# if we failed a step, then try again quickly with exponential backoff
if failed:
@@ -992,9 +1088,10 @@ def get_wallet(rpc_token=None, config_path=None, proxy=None):
# RPC method: backend_preorder
def preorder(rpc_token, fqu, config_path=None, proxy=None):
def preorder(rpc_token, fqu, zonefile_data, profile, transfer_address, config_path=None, proxy=None):
"""
Send preorder transaction and enter it in queue.
Queue up additional state so we can update and transfer it as well.
The entered registration is picked up
by the monitor process.
Return {'success': True, ...} on success
@@ -1006,6 +1103,7 @@ def preorder(rpc_token, fqu, config_path=None, proxy=None):
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['success'] = False
data['error'] = "Incorrect RPC token"
return data
@@ -1028,15 +1126,24 @@ def preorder(rpc_token, fqu, config_path=None, proxy=None):
data['error'] = "Failed to look up name cost: %s" % cost_info['error']
return data
if is_name_registered( fqu, proxy=proxy ):
return {'success': False, 'error': "Name is already registered"}
payment_privkey_info = get_wallet_payment_privkey_info(rpc_token, config_path=config_path, proxy=proxy)
owner_privkey_info = get_wallet_owner_privkey_info(rpc_token, config_path=config_path, proxy=proxy)
owner_privkey_params = get_privkey_info_params( owner_privkey_info )
owner_address = get_privkey_info_address( owner_privkey_info )
if not is_name_registered(fqu, proxy=proxy):
resp = async_preorder(fqu, payment_privkey_info, owner_address, cost_info['satoshis'], owner_privkey_params=owner_privkey_params, proxy=proxy, config_path=config_path, queue_path=state.queue_path)
else:
return {'success': False, 'error': "Name is already registered"}
name_data = {
'transfer_address': transfer_address,
'zonefile': zonefile_data,
'profile': profile
}
log.debug("async_preorder({}, zonefile_data={}, profile={}, transfer_address={})".format(fqu, zonefile_data, profile, transfer_address))
resp = async_preorder(fqu, payment_privkey_info, owner_address, cost_info['satoshis'],
owner_privkey_params=owner_privkey_params, name_data=name_data,
proxy=proxy, config_path=config_path, queue_path=state.queue_path)
if 'error' not in resp:
data['success'] = True
@@ -1054,9 +1161,10 @@ def preorder(rpc_token, fqu, config_path=None, proxy=None):
# RPC method: backend_update
def update( rpc_token, fqu, zonefile_txt_b64, profile, zonefile_hash, config_path=None, proxy=None ):
def update( rpc_token, fqu, zonefile_txt_b64, profile, zonefile_hash, transfer_address, config_path=None, proxy=None ):
"""
Send a new zonefile hash. Queue the zonefile data for subsequent replication.
zonefile_txt_b64 must be b64-encoded so we can send it over RPC sanely
"""
state, config_path, proxy = get_plugin_state(config_path=config_path, proxy=proxy)
@@ -1064,6 +1172,7 @@ def update( rpc_token, fqu, zonefile_txt_b64, profile, zonefile_hash, config_pat
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['success'] = False
data['error'] = "Incorrect RPC token"
return data
@@ -1098,9 +1207,15 @@ def update( rpc_token, fqu, zonefile_txt_b64, profile, zonefile_hash, config_pat
if not is_zonefile_hash_current(fqu, zonefile_hash, proxy=proxy ):
# new zonefile data
name_data = {
'transfer_address': transfer_address
}
log.debug("async_update({}, zonefile_data={}, profile={}, transfer_address={})".format(fqu, zonefile_txt, profile, transfer_address))
resp = async_update(fqu, zonefile_txt, profile,
owner_privkey_info,
payment_privkey_info,
name_data=name_data,
zonefile_hash=zonefile_hash,
proxy=proxy,
config_path=config_path,
@@ -1136,6 +1251,9 @@ def transfer(rpc_token, fqu, transfer_address, config_path=None, proxy=None ):
"""
Send transfer transaction.
Keeps the zonefile data.
Return {'success': True, 'transaction_hash': ..., 'message': ...} on success
Return {'success': False, 'error': ...}
"""
state, config_path, proxy = get_plugin_state(config_path=config_path, proxy=proxy)
@@ -1143,6 +1261,7 @@ def transfer(rpc_token, fqu, transfer_address, config_path=None, proxy=None ):
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['success'] = False
data['error'] = "Incorrect RPC token"
return data
@@ -1180,110 +1299,7 @@ def transfer(rpc_token, fqu, transfer_address, config_path=None, proxy=None ):
data['transaction_hash'] = resp['transaction_hash']
else:
data['success'] = False
data['message'] = "Couldn't broadcast transaction. You can try again."
return data
# RPC method: backend_migrate
def migrate( rpc_token, fqu, config_path=None, proxy=None ):
"""
Create an empty profile/zonefile for a name, and send the hash of the
zonefile to the blockchain. Queue up the zonefile and profile for replication.
Return {'success': True, 'transaciton_hash': ..., 'zonefile_hash': ...} on success
Return {'success': True} if the profile has already been migrated
Return {'success': False, 'error': ...} on failure
"""
state, config_path, proxy = get_plugin_state(config_path=config_path, proxy=proxy)
data = {}
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['error'] = "Incorrect RPC token"
return data
if state.payment_address is None or state.owner_address is None:
data['success'] = False
data['error'] = "Wallet is not unlocked."
return data
if in_queue("update", fqu, path=state.queue_path):
data['success'] = False
data['error'] = "Already in queue."
return data
if state.data_pubkey is None or state.data_privkey_info is None:
data['success'] = False
data['error'] = "No data key set"
return data
rpc_token = get_rpc_token(config_path)
wallet_keys = get_wallet(rpc_token=rpc_token, config_path=config_path, proxy=proxy )
if 'error' in wallet_keys:
log.error("Failed to get wallet: %s" % wallet_keys['error'])
data['success'] = False
data['error'] = 'Failed to load wallet'
return data
# we need data keys
if not wallet_keys.has_key('data_pubkey') or not wallet_keys.has_key('data_privkey') or wallet_keys['data_pubkey'] is None or wallet_keys['data_privkey'] is None:
log.error("No data key set in the wallet")
return {'success': False, 'error': 'No data keys in wallet. Please run `upgrade_wallet` first.'}
user_profile, user_zonefile, legacy = get_and_migrate_profile( fqu, create_if_absent=True, proxy=proxy, wallet_keys=wallet_keys )
if 'error' in user_profile:
log.debug("Unable to load user zonefile for '%s': %s" % (fqu, user_profile['error']))
return {'success': False, 'error': 'Unable to load user zonefile: %s' % user_profile['error']}
if not legacy:
return {'success': True}
user_zonefile = user_zonefile['zonefile']
user_profile = user_profile['profile']
resp = None
payment_privkey_info = get_wallet_payment_privkey_info(rpc_token, config_path=config_path, proxy=proxy)
owner_privkey_info = get_wallet_owner_privkey_info(rpc_token, config_path=config_path, proxy=proxy)
replication_error = None
zonefile_txt = blockstack_zones.make_zone_file( user_zonefile )
zonefile_hash = get_zonefile_data_hash( zonefile_txt )
if not is_zonefile_hash_current(fqu, zonefile_hash, proxy=proxy ):
resp = async_update(fqu, zonefile_txt, user_profile,
owner_privkey_info,
payment_privkey_info,
zonefile_hash=zonefile_hash,
proxy=proxy,
config_path=config_path,
queue_path=state.queue_path)
else:
return {'success': True, 'warning': "The zonefile has not changed, so no update sent."}
if 'error' not in resp:
data['success'] = True
data['message'] = "The name has been queued up for update and"
data['message'] += " will take ~1 hour to process. You can"
data['message'] += " check on the status at any time by running"
data['message'] += " 'blockstack info'."
data['transaction_hash'] = resp['transaction_hash']
data['zonefile_hash'] = resp['zonefile_hash']
data['zonefile'] = user_zonefile
data['profile'] = user_profile
else:
log.error("async_update failed with: '%s'" % resp['error'])
data['success'] = False
data['message'] = "Couldn't broadcast transaction. You can try again."
data['error'] = resp['error']
if replication_error is not None:
data['warning'] = "Failed to replicate the zonefile ('%s')" % replication_error
data['error'] = "Couldn't broadcast transaction. You can try again."
return data
@@ -1291,6 +1307,9 @@ def migrate( rpc_token, fqu, config_path=None, proxy=None ):
def renew( rpc_token, fqu, renewal_fee, config_path=None, proxy=None ):
"""
Renew a name
Return {'success': True, 'message': ..., 'transaction_hash': ...} on success
Return {'error': ...} on error
"""
state, config_path, proxy = get_plugin_state(config_path=config_path, proxy=proxy)
@@ -1298,6 +1317,7 @@ def renew( rpc_token, fqu, renewal_fee, config_path=None, proxy=None ):
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['success'] = False
data['error'] = "Incorrect RPC token"
return data
@@ -1342,6 +1362,9 @@ def renew( rpc_token, fqu, renewal_fee, config_path=None, proxy=None ):
def revoke( rpc_token, fqu, config_path=None, proxy=None ):
"""
Revoke a name
Return {'success': True, 'message': ..., 'transaction_hash': ...} on success
Return {'error': ...} on error
"""
state, config_path, proxy = get_plugin_state(config_path=config_path, proxy=proxy)
@@ -1349,6 +1372,7 @@ def revoke( rpc_token, fqu, config_path=None, proxy=None ):
valid_rpc_token = get_rpc_token(config_path=config_path)
if str(valid_rpc_token) != str(rpc_token):
data['success'] = False
data['error'] = "Incorrect RPC token"
return data
@@ -1401,7 +1425,6 @@ RPC_METHODS = [
preorder,
update,
transfer,
migrate,
renew,
revoke
]