move storage pusher to blockstackd.py; remove need for names when pushing zonefiles; have the zonefile-pushing logic inform the Atlas logic that the zonefile has been replicated.

This commit is contained in:
Jude Nelson
2016-11-22 14:25:22 -05:00
parent 06da510c1f
commit 51e953400c
2 changed files with 169 additions and 150 deletions

View File

@@ -224,14 +224,14 @@ def remove_cached_zonefile_data( zonefile_hash, zonefile_dir=None ):
return True
def store_zonefile_data_to_storage( name, zonefile_text, txid, required=[], cache=False, zonefile_dir=None, tx_required=True ):
def store_zonefile_data_to_storage( zonefile_text, txid, required=[], cache=False, zonefile_dir=None, tx_required=True ):
"""
Upload a zonefile to our storage providers.
Return True if at least one provider got it.
Return False otherwise.
"""
if tx_required and txid is None:
log.error("No txid for zonefile hash '%s' (for '%s')" % (zonefile_hash, name))
log.error("No txid for zonefile hash '%s'" % (zonefile_hash))
return False
zonefile_hash = get_zonefile_data_hash( zonefile_text )
@@ -244,7 +244,7 @@ def store_zonefile_data_to_storage( name, zonefile_text, txid, required=[], cach
# NOTE: this can fail if one of the required drivers needs a non-null txid
res = blockstack_client.storage.put_immutable_data( None, txid, data_hash=zonefile_hash, data_text=zonefile_text, required=required )
if res is None:
log.error("Failed to store zonefile '%s' (%s) for '%s'" % (zonefile_hash, txid, name))
log.error("Failed to store zonefile '%s' for '%s'" % (zonefile_hash, txid))
return False
return True
@@ -294,143 +294,3 @@ def store_profile_data_to_storage( name, profile_txt, required=None ):
return successes
class StoragePusher( threading.Thread ):
"""
worker thread to push data into storage providers,
so we don't block the RPC server.
"""
def __init__(self, conf, queue_path):
threading.Thread.__init__(self)
self.running = False
self.accepting = True
self.queue_path = queue_path
self.config = conf
self.zonefile_dir = conf.get('zonefile_dir', None)
self.zonefile_storage_drivers = conf['zonefile_storage_drivers'].split(",")
self.profile_storage_drivers = conf['profile_storage_drivers'].split(",")
self.zonefile_queue_id = "push-zonefile"
self.profile_queue_id = "push-profile"
def enqueue_zonefile( self, name, txid, zonefile_hash, zonefile_data ):
"""
Enqueue a zonefile for replication
"""
try:
existing = queue_findone( self.zonefile_queue_id, name, path=self.queue_path )
if len(existing) > 0:
log.error("Already queued {}.{}".format(name, zonefile_hash))
return False
log.debug("Queue {}-byte zonefile for {}".format(len(zonefile_data), name))
res = queue_append( self.zonefile_queue_id, name, txid, block_height=0, zonefile_hash=zonefile_hash, zonefile_data=zonefile_data, path=self.queue_path )
assert res
return True
except Exception as e:
log.exception(e)
return False
def enqueue_profile( self, name, profile_data ):
"""
Enqueue a profile for replication
"""
try:
existing = queue_findone( self.profile_queue_id, name, path=self.queue_path )
if len(existing) > 0:
log.error("Already queued {}.{}".format(name, zonefile_hash))
return False
log.debug("Queue {}-byte profile for {}".format(len(profile_data), name))
res = queue_append( self.profile_queue_id, None, block_height=0, profile=profile_data )
assert res
return True
except Exception as e:
log.exception(e)
return False
def store_one_zonefile(self):
"""
Find and store one zonefile
"""
# find a zonefile
entries = queue_findall( self.zonefile_queue_id, limit=1, path=self.queue_path )
if entries is None or len(entries) == 0:
# empty
return False
entry = entries[0]
res = store_zonefile_data_to_storage( entry['fqu'], str(entry['zonefile']), entry['tx_hash'], required=self.zonefile_storage_drivers, cache=False, zonefile_dir=self.zonefile_dir, tx_required=False )
if not res:
log.error("Failed to store zonefile {} for {} ({} bytes)".format(entry['zoenfile_hash'], entry['fqu'], len(entry['zonefile'])))
return False
log.debug("Replicated zonefile {} for {} ({} bytes)".format(entry['zonefile_hash'], entry['fqu'], len(entry['zonefile'])))
queue_removeall( entries, path=self.queue_path )
return res
def store_one_profile(self):
"""
Find and store one profile
"""
entries = queue_findall( self.profile_queue_id, limit=1, path=self.queue_path )
if entries is None or len(entries) == 0:
# empty
return False
entry = entries[0]
num_successes = store_profile_data_to_storage( entry['fqu'], str(entry['profile']), required=self.profile_storage_drivers )
if num_successes == 0:
log.error("Failed to store profile for {} ({} bytes)".format(entry['fqu'], len(entry['profile'])))
return False
log.debug("Replicated profile for {} ({} bytes)".format(entry['fqu'], len(entry['profile'])))
queue_removeall( entries, path=self.queue_path )
return True
def run(self):
"""
Push zonefiles and profiles
"""
self.running = True
while self.running:
res_zonefile = self.store_one_zonefile()
res_profile = self.store_one_profile()
if not res_zonefile and not res_profile:
time.sleep(1.0)
continue
log.debug("StoragePusher thread exit")
self.running = False
def signal_stop(self):
self.running = False
log.debug("StoragePusher signal stop")
def drain(self):
"""
Stop taking requests and wait for the queue to drain
"""
self.accepting = False
while True:
zonefile_entries = queue_findall( self.zonefile_queue_id, limit=1, path=self.queue_path )
profile_entries = queue_findall( self.profile_queue_id, limit=1, path=self.queue_path )
if len(zonefile_entries) > 0 or len(profile_entries) > 0:
log.debug("Still have data remaining")
time.sleep(1.0)
continue
else:
break