Merge branch 'hotfix/registrar-check-storage' into hotfix/did-support

This commit is contained in:
Jude Nelson
2017-10-31 16:52:40 -04:00
16 changed files with 1554 additions and 109 deletions

View File

@@ -188,6 +188,7 @@ def queuedb_find( queue_id, fqu, limit=None, path=DEFAULT_QUEUE_PATH ):
db.close()
return ret
def queue_add_error_msg( queue_id, fqu, error_msg, path=DEFAULT_QUEUE_PATH ):
"""
Add an error message for an entry
@@ -384,10 +385,42 @@ def queue_append(queue_id, fqu, tx_hash, payment_address=None,
if zonefile_hash is not None:
new_entry['zonefile_hash'] = zonefile_hash
new_entry['replicated_zonefile'] = False
queuedb_insert( queue_id, fqu, tx_hash, new_entry, path=path )
return True
def queue_set_data(queue_id, fqu, new_data, path=DEFAULT_QUEUE_PATH):
"""
Update a name's data in the queue
"""
entry_data = {}
entry_data.update(new_data)
for k in ['tx_hash', 'type', 'queue_id', 'fqu']:
if k in entry_data:
del entry_data[k]
if entry_data.has_key('zonefile'):
entry_data['zonefile_b64'] = base64.b64encode(entry_data['zonefile'])
del entry_data['zonefile']
sql = "UPDATE entries SET data = ? WHERE fqu = ? AND queue_id = ?;"
args = (json.dumps(entry_data,sort_keys=True), fqu, queue_id)
db = queuedb_open(path)
if db is None:
raise Exception("Failed to open %s" % path)
cur = db.cursor()
res = queuedb_query_execute( cur, sql, args )
db.commit()
db.close()
return True
def extract_entry( rowdata ):
"""
Convert a row into a flat dict that contains everything.

View File

@@ -44,7 +44,7 @@ from virtualchain.lib.ecdsalib import ecdsa_private_key
from .queue import get_queue_state, in_queue, cleanup_preorder_queue, queue_removeall
from .queue import queue_find_accepted, queuedb_find
from .queue import queue_add_error_msg
from .queue import queue_add_error_msg, queue_set_data
from .nameops import async_preorder, async_register, async_update, async_transfer, async_renew, async_revoke
@@ -457,8 +457,21 @@ class RegistrarWorker(threading.Thread):
accepted = queue_find_accepted( queue_name, path=queue_path, config_path=config_path )
if len(accepted) > 0:
log.debug("Clear %s confirmed %s operations" % (len(accepted), queue_name))
queue_removeall( accepted, path=queue_path )
# if this is a renew or name_import, and we have a zone file, then don't clear it until it's replicated
to_clear = accepted
if queue_name in ['renew', 'name_import']:
to_clear = []
for acc in accepted:
if acc.has_key('replicated_zonefile') and not acc['replicated_zonefile']:
if acc.has_key('zonefile') and acc['zonefile']:
log.debug("Do NOT remove {} ({}) just yet--it still has a zonefile to replicate".format(acc['fqu'], acc['tx_hash']))
continue
to_clear.append(acc)
log.debug("Clear %s (out of %s) confirmed %s operations" % (len(to_clear), len(accepted), queue_name))
queue_removeall( to_clear, path=queue_path )
# remove expired preorders
cleanup_preorder_queue(path=queue_path, config_path=config_path)
@@ -466,7 +479,7 @@ class RegistrarWorker(threading.Thread):
@classmethod
def replicate_name_data( cls, name_data, atlas_servers, wallet_data, storage_drivers, config_path, proxy=None, replicated_zonefiles=[], replicated_profile_hashes=[] ):
def replicate_name_data( cls, name_data, atlas_servers, wallet_data, storage_drivers, config_path, queue_path, proxy=None, replicated_zonefiles=[], replicated_profile_hashes=[] ):
"""
Given an update queue entry,
replicate the zonefile to as many
@@ -510,6 +523,10 @@ class RegistrarWorker(threading.Thread):
log.info("Replicated zonefile data for %s to %s server(s)" % (name_data['fqu'], len(res['servers'])))
replicated_zonefiles.append(zonefile_hash)
# remember that we replicated the zone file
name_data['replicated_zonefile'] = True
queue_set_data(name_data['type'], name_data['fqu'], name_data, path=queue_path)
# replicate profile to storage, if given
# use the data keypair
@@ -576,7 +593,7 @@ class RegistrarWorker(threading.Thread):
atlas_servers = cls.get_atlas_server_list( config_path )
if 'error' in atlas_servers:
log.warn('Failed to get server list: {}'.format(atlas_servers['error']))
return {'error': 'Failed to get Atlas server list'}
return {'error': 'Failed to get Atlas server list', 'names': [u['fqu'] for u in updates]}
for update in updates:
if update['fqu'] in skip:
@@ -584,10 +601,10 @@ class RegistrarWorker(threading.Thread):
continue
log.debug("Zone file update on '%s' (%s) is confirmed! New hash is %s" % (update['fqu'], update['tx_hash'], update.get('zonefile_hash', None)))
res = cls.replicate_name_data( update, atlas_servers, wallet_data, storage_drivers, config_path, proxy=proxy )
res = cls.replicate_name_data( update, atlas_servers, wallet_data, storage_drivers, config_path, queue_path, proxy=proxy )
if 'error' in res:
log.error("Failed to update %s: %s" % (update['fqu'], res['error']))
queue_add_error_msg('update', update['fqu'], res['error'], path=queue_path)
log.error("Failed to replicate zone file and/or profile for %s: %s" % (update['fqu'], res['error']))
queue_add_error_msg(update['type'], update['fqu'], res['error'], path=queue_path)
ret = {'error': 'Failed to finish an update'}
failed_names.append( update['fqu'] )
@@ -673,7 +690,7 @@ class RegistrarWorker(threading.Thread):
def transfer_names( cls, queue_path, skip=[], config_path=CONFIG_PATH, proxy=None ):
"""
Find all confirmed updates and regups, and if they have a transfer address, transfer them.
Otherwise, clear them from the update queue.
Otherwise, clear them from the update queue if their zonefiles have been replicated.
Return {'status': True} on success
Return {'error': ..., 'names': ...} on failure
@@ -725,10 +742,14 @@ class RegistrarWorker(threading.Thread):
failed.append(update['fqu'])
else:
# nothing more to do
log.debug("Done working on {}".format(update['fqu']))
log.debug("Final name output: {}".format(update))
queue_removeall( [update], path=queue_path )
# nothing more to do, unless we have a zonefile to replicate still
if update.has_key('replicated_zonefile') and not update['replicated_zonefile'] and update.has_key('zonefile') and update['zonefile']:
log.debug("Do not clear {} ({}) just yet--it still has a zonefile to replicate".format(update['fqu'], update['tx_hash']))
else:
log.debug("Done working on {}".format(update['fqu']))
log.debug("Final name output: {}".format(update))
queue_removeall( [update], path=queue_path )
if 'error' in ret:
ret['names'] = failed
@@ -750,7 +771,7 @@ class RegistrarWorker(threading.Thread):
atlas_peers_res = {}
try:
atlas_peers_res = get_atlas_peers( server_hostport, proxy = get_default_proxy(config_path) )
assert 'error' not in atlas_peers_res
assert 'error' not in atlas_peers_res, atlas_peers_res['error']
servers += atlas_peers_res['peers']
@@ -941,121 +962,146 @@ class RegistrarWorker(threading.Thread):
break
poll_interval = 1.0
try:
# see if we can complete any registrations
# clear out any confirmed preorders
# log.debug("register all pending preorders in %s" % (self.queue_path))
res = RegistrarWorker.register_preorders( self.queue_path, wallet_data, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Registration failed: %s" % res['error'])
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_PREORDERS", '0') != '1':
try:
# see if we can complete any registrations
# clear out any confirmed preorders
# log.debug("register all pending preorders in %s" % (self.queue_path))
res = RegistrarWorker.register_preorders( self.queue_path, wallet_data, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Registration failed: %s" % res['error'])
# try exponential backoff
# try exponential backoff
failed = True
except Exception, e:
log.exception(e)
failed = True
except Exception, e:
log.exception(e)
failed = True
else:
log.debug("Skipping register_preorders step due to injected fault")
try:
# see if we can put any zonefiles via NAME_UPDATE
# clear out any confirmed registers
# log.debug("put zonefile hashes for registered names in %s" % (self.queue_path))
res = RegistrarWorker.set_zonefiles( self.queue_path, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn('zonefile hash broadcast failed: %s' % res['error'])
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_UPDATES", '0') != '1':
try:
# see if we can put any zonefiles via NAME_UPDATE
# clear out any confirmed registers
# log.debug("put zonefile hashes for registered names in %s" % (self.queue_path))
res = RegistrarWorker.set_zonefiles( self.queue_path, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn('zonefile hash broadcast failed: %s' % res['error'])
failed = True
except Exception, e:
log.exception(e)
failed = True
else:
log.debug("Skipping set_zonefiles step due to injected fault")
except Exception, e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_REGUP_REPLICATION", '0') != '1':
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_REGISTERs with zone file hashes (post F-day 2017)
# clear out any confirmed registers
# log.debug("replicate all pending zone files and key files for register/updates %s" % (self.queue_path))
res = RegistrarWorker.replicate_register_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/key file replication failed for register: %s" % res['error'])
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_REGISTERs with zone file hashes (post F-day 2017)
# clear out any confirmed registers
# log.debug("replicate all pending zone files and key files for register/updates %s" % (self.queue_path))
res = RegistrarWorker.replicate_register_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/key file replication failed for register: %s" % res['error'])
failed = True
failed_names += res['names']
except Exception, e:
log.exception(e)
failed = True
failed_names += res['names']
else:
log.debug("Skipping replicate_register_data step due to injected fault")
except Exception, e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_UPDATE_REPLICATION", '0') != '1':
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_UPDATEs
# clear out any confirmed updates
# log.debug("replicate all pending zone files and profiles for updates %s" % (self.queue_path))
res = RegistrarWorker.replicate_update_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/profile replication failed for update: %s" % res['error'])
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_UPDATEs
# clear out any confirmed updates
# log.debug("replicate all pending zone files and profiles for updates %s" % (self.queue_path))
res = RegistrarWorker.replicate_update_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/profile replication failed for update: %s" % res['error'])
failed = True
failed_names += res['names']
except Exception, e:
log.exception(e)
failed = True
failed_names += res['names']
else:
log.debug("Skipping replicate_update_data step due to injected fault")
except Exception, e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_RENEWAL_REPLICATION", '0') != '1':
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_RENEWs (post F-day 2017)
# clear out any confirmed renewals
# log.debug("replicate all pending zone files and key files for renewals %s" % (self.queue_path))
res = RegistrarWorker.replicate_renewal_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/key file replication failed for renewal: %s" % res['error'])
try:
# see if we can replicate any zonefiles and key files for confirmed NAME_RENEWs (post F-day 2017)
# clear out any confirmed renewals
# log.debug("replicate all pending zone files and key files for renewals %s" % (self.queue_path))
res = RegistrarWorker.replicate_renewal_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file/key file replication failed for renewal: %s" % res['error'])
failed = True
failed_names += res['names']
except Exception, e:
log.exception(e)
failed = True
failed_names += res['names']
else:
log.debug("Skipping replicate_renewal_data step due to injected fault")
except Exception, e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_TRANSFER_NAMES", '0') != '1':
try:
# see if we can transfer any names to their new owners
# log.debug("transfer all names in {}".format(self.queue_path))
res = RegistrarWorker.transfer_names( self.queue_path, skip=failed_names, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Transfer failed: {}".format(res['error']))
try:
# see if we can transfer any names to their new owners
# log.debug("transfer all names in {}".format(self.queue_path))
res = RegistrarWorker.transfer_names( self.queue_path, skip=failed_names, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Transfer failed: {}".format(res['error']))
failed = True
failed_names += res['names']
except Exception as e:
log.exception(e)
failed = True
failed_names += res['names']
else:
log.debug("Skipping replicate_renewal_data step due to injected fault")
except Exception as e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_IMPORT_REPLICATION", '0') != '1':
try:
# see if we can replicate any zonefiles for name imports
# clear out any confirmed imports
# log.debug("replicate all pending zone files for name imports in {}".format(self.queue_path))
res = RegistrarWorker.replicate_name_import_data( self.queue_path, wallet_data, self.required_storage_drivers, skip=failed_names, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file replication failed: {}".format(res['error']))
try:
# see if we can replicate any zonefiles for name imports
# clear out any confirmed imports
# log.debug("replicate all pending zone files for name imports in {}".format(self.queue_path))
res = RegistrarWorker.replicate_name_import_data( self.queue_path, wallet_data, self.required_storage_drivers, skip=failed_names, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file replication failed: {}".format(res['error']))
failed = True
failed_names += res['names']
except Exception, e:
log.exception(e)
failed = True
failed_names += res['names']
else:
log.debug("Skipping replicate_name_import_data due to injected fault")
except Exception, e:
log.exception(e)
failed = True
if os.environ.get("BLOCKSTACK_TEST_REGISTRAR_FAULT_INJECTION_SKIP_CLEAR_CONFIRMED", '0') != '1':
try:
# see if we can remove any other confirmed operations, besides preorders, registers, and updates
# log.debug("clean out other confirmed operations")
res = RegistrarWorker.clear_confirmed( self.config_path, self.queue_path, proxy=proxy )
if 'error' in res:
log.warn("Failed to clear out some operations: %s" % res['error'])
try:
# see if we can remove any other confirmed operations, besides preorders, registers, and updates
# log.debug("clean out other confirmed operations")
res = RegistrarWorker.clear_confirmed( self.config_path, self.queue_path, proxy=proxy )
if 'error' in res:
log.warn("Failed to clear out some operations: %s" % res['error'])
failed = True
except Exception, e:
log.exception(e)
failed = True
except Exception, e:
log.exception(e)
failed = True
else:
log.debug("Skipping clear_confirmed due to injected fault")
# if we failed a step, then try again quickly with exponential backoff
if failed: