have the registrar track unconfirmed name imports and replicate zone files once they are confirmed

This commit is contained in:
Jude Nelson
2017-04-13 10:59:34 -04:00
parent 0fa6030da8
commit 0df007409c

View File

@@ -301,6 +301,15 @@ class RegistrarWorker(threading.Thread):
return accepted
@classmethod
def get_confirmed_name_imports( cls, config_path, queue_path ):
"""
Find all confirmed name imports
"""
accepted = queue_find_accepted( "name_import", path=queue_path, config_path=config_path )
return accepted
@classmethod
def register_preorders( cls, queue_path, wallet_data, config_path=CONFIG_PATH, proxy=None ):
"""
@@ -347,11 +356,11 @@ class RegistrarWorker(threading.Thread):
@classmethod
def clear_confirmed( cls, config_path, queue_path, proxy=None ):
"""
Find all confirmed transactions besides preorder, register, and update, and remove them from the queue.
Find all confirmed transactions besides preorder, register, update, and remove them from the queue.
Return {'status': true} on success
Return {'error': ...} on failure
"""
for queue_name in ['transfer', 'revoke', 'renew']:
for queue_name in ['transfer', 'revoke', 'renew', 'name_import']:
accepted = queue_find_accepted( queue_name, path=queue_path, config_path=config_path )
if len(accepted) > 0:
@@ -454,9 +463,9 @@ class RegistrarWorker(threading.Thread):
@classmethod
def replicate_names_data( cls, queue_path, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
def replicate_names_data( cls, queue_path, updates, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
"""
Replicate all zonefiles and profiles for each confirmed update.
Replicate all zonefiles and profiles for each confirmed update or name import.
@atlas_servers should be a list of (host, port)
Do NOT remove items from the queue.
@@ -467,17 +476,13 @@ class RegistrarWorker(threading.Thread):
ret = {'status': True}
failed_names = []
updates = cls.get_confirmed_updates( config_path, queue_path )
if len(updates) == 0:
return ret
atlas_servers = cls.get_atlas_server_list( config_path )
if 'error' in atlas_servers:
log.warn('Failed to get server list: {}'.format(servers['error']))
return {'error': 'Failed to get Atlas server list'}
for update in updates:
log.debug("Zonefile update on '%s' (%s) is confirmed! New hash is %s" % (update['fqu'], update['tx_hash'], update['zonefile_hash']))
log.debug("Zone file update on '%s' (%s) is confirmed! New hash is %s" % (update['fqu'], update['tx_hash'], update['zonefile_hash']))
res = cls.replicate_name_data( update, atlas_servers, wallet_data, storage_drivers, config_path, proxy=proxy )
if 'error' in res:
log.error("Failed to update %s: %s" % (update['fqu'], res['error']))
@@ -489,6 +494,41 @@ class RegistrarWorker(threading.Thread):
return ret
@classmethod
def replicate_update_data( cls, queue_path, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
"""
Replicate all zone files and profiles for each confirmed NAME_UPDATE
@atlas_servers should be a list of (host, port)
Do NOT remove items from the queue.
Return {'status': True} on success
Return {'error': ..., 'names': [...]} on failure. 'names' refers to the list of names that failed
"""
updates = cls.get_confirmed_updates( config_path, queue_path )
if len(updates) == 0:
return {'status': True}
return cls.replicate_names_data(queue_path, updates, wallet_data, storage_drivers, config_path=config_path, proxy=proxy)
@classmethod
def replicate_name_import_data( cls, queue_path, wallet_data, storage_drivers, config_path=CONFIG_PATH, proxy=None ):
"""
Replicate all zone files and profiles for each confirmed NAME_UPDATE
@atlas_servers should be a list of (host, port)
Do NOT remove items from the queue.
Return {'status': True} on success
Return {'error': ..., 'names': [...]} on failure. 'names' refers to the list of names that failed
"""
name_imports = cls.get_confirmed_name_imports( config_path, queue_path )
if len(name_imports) == 0:
return {'status': True}
return cls.replicate_names_data(queue_path, name_imports, wallet_data, storage_drivers, config_path=config_path, proxy=proxy)
@classmethod
def transfer_names( cls, queue_path, skip=[], config_path=CONFIG_PATH, proxy=None ):
@@ -778,11 +818,11 @@ class RegistrarWorker(threading.Thread):
try:
# see if we can replicate any zonefiles and profiles
# clear out any confirmed updates
log.debug("replicate all pending zonefiles and profiles in %s" % (self.queue_path))
log.debug("replicate all pending zone files and profiles for updates %s" % (self.queue_path))
failed_names = []
res = RegistrarWorker.replicate_names_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
res = RegistrarWorker.replicate_update_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zonefile/profile replication failed: %s" % res['error'])
log.warn("Zone file/profile replication failed for update: %s" % res['error'])
# try exponential backoff
failed = True
@@ -810,6 +850,25 @@ class RegistrarWorker(threading.Thread):
failed = True
poll_interval = 1.0
try:
# see if we can replicate any zonefiles for name imports
# clear out any confirmed imports
log.debug("replicate all pending zone files for name imports in {}".format(self.queue_path))
failed_names = []
res = RegistrarWorker.replicate_name_import_data( self.queue_path, wallet_data, self.required_storage_drivers, config_path=self.config_path, proxy=proxy )
if 'error' in res:
log.warn("Zone file replication failed: {}".format(res['error']))
# try exponential backoff
failed = True
poll_interval = 1.0
failed_names = res['names']
except Exception, e:
log.exception(e)
failed = True
poll_interval = 1.0
try:
# see if we can remove any other confirmed operations, besides preorders, registers, and updates
log.debug("clean out other confirmed operations")