guard access to in-RAM zonefile indexes with a mutex (now that multiple threads can write to it); have Atlas call a "storage callback" each time it processes a zonefile it did not previously have (which we leverage in the subdomain indexer)

This commit is contained in:
Jude Nelson
2018-01-29 19:07:26 -05:00
parent fcb0555c8a
commit 7fd00dce46

View File

@@ -37,7 +37,7 @@ import gc
import virtualchain
from nameset.virtualchain_hooks import get_last_block, get_snapshots, get_valid_transaction_window
from .util import url_to_host_port, atlas_inventory_to_string
from .util import url_to_host_port, atlas_inventory_to_string, db_query_execute
from .storage.auth import get_zonefile_data_hash
from .client import \
@@ -81,6 +81,7 @@ NUM_NEIGHBORS = 80 # number of neighbors a peer can report
ZONEFILE_INV = None # this atlas peer's current zonefile inventory
NUM_ZONEFILES = 0 # cache-coherent count of the number of zonefiles present
ZONEFILE_INV_LOCK = threading.Lock() # lock to guard the above
MAX_QUEUED_ZONEFILES = 1000 # maximum number of queued zonefiles
@@ -555,28 +556,7 @@ def atlasdb_query_execute(cur, query, values):
Abort on failure.
"""
with DB_LOCK:
timeout = 1.0
while True:
try:
ret = cur.execute(query, values)
return ret
except sqlite3.OperationalError as oe:
if oe.message == "database is locked":
timeout = timeout * 2 + timeout * random.random()
log.error("Query timed out due to lock; retrying in %s: %s" % (timeout, atlasdb_format_query(query, values)))
time.sleep(timeout)
else:
log.exception(oe)
log.error("FATAL: failed to execute query (%s, %s)" % (query, values))
log.error("\n".join(traceback.format_stack()))
os.abort()
except Exception, e:
log.exception(e)
log.error("FATAL: failed to execute query (%s, %s)" % (query, values))
log.error("\n".join(traceback.format_stack()))
os.abort()
return db_query_execute(cur, query, values)
def atlasdb_open( path ):
@@ -600,7 +580,7 @@ def atlasdb_add_zonefile_info( name, zonefile_hash, txid, present, tried_storage
Mark it as present or absent.
Keep our in-RAM inventory vector up-to-date
"""
global ZONEFILE_INV, NUM_ZONEFILES
global ZONEFILE_INV, NUM_ZONEFILES, ZONEFILE_INV_LOCK
with AtlasDBOpen( con=con, path=path ) as dbcon:
if present:
@@ -628,19 +608,21 @@ def atlasdb_add_zonefile_info( name, zonefile_hash, txid, present, tried_storage
atlasdb_query_execute( cur, sql, args )
dbcon.commit()
# keep in-RAM zonefile inv coherent
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=dbcon, path=path )
with ZONEFILE_INV_LOCK:
inv_vec = None
if ZONEFILE_INV is None:
inv_vec = ""
else:
inv_vec = ZONEFILE_INV[:]
# keep in-RAM zonefile inv coherent
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=dbcon, path=path )
ZONEFILE_INV = atlas_inventory_flip_zonefile_bits( inv_vec, zfbits, present )
inv_vec = None
if ZONEFILE_INV is None:
inv_vec = ""
else:
inv_vec = ZONEFILE_INV[:]
# keep in-RAM zonefile count coherent
NUM_ZONEFILES = atlasdb_zonefile_inv_length( con=dbcon, path=path )
ZONEFILE_INV = atlas_inventory_flip_zonefile_bits( inv_vec, zfbits, present )
# keep in-RAM zonefile count coherent
NUM_ZONEFILES = atlasdb_zonefile_inv_length( con=dbcon, path=path )
return True
@@ -657,7 +639,6 @@ def atlasdb_get_lastblock( con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
row = {}
for r in res:
@@ -672,17 +653,17 @@ def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
"""
Look up all information on this zonefile.
Returns {'zonefile_hash': ..., 'indexes': [...], etc}
Zonefile information will be ordered by inv_index
"""
ret = None
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT * FROM zonefiles WHERE zonefile_hash = ?;"
sql = "SELECT * FROM zonefiles WHERE zonefile_hash = ? ORDER BY inv_index;"
args = (zonefile_hash,)
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = {
'zonefile_hash': zonefile_hash,
@@ -700,10 +681,11 @@ def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
return ret
def atlasdb_get_zonefiles_by_block( from_block, to_block, offset, count, con=None, path=None ):
def atlasdb_get_zonefiles_by_block( from_block, to_block, offset, count, name=None, con=None, path=None ):
"""
Look up all zonefile hashes in a block range.
Returns {'zonefile_hash': ..., 'indexes': [...], etc}
Look up all zonefile hashes in a block range. Optionally filter by name.
Returns [{'name': ..., 'zonefile_hash': ..., 'block_height': ..., 'txid': ..., 'inv_index': ...}]
"""
ret = None
@@ -712,14 +694,18 @@ def atlasdb_get_zonefiles_by_block( from_block, to_block, offset, count, con=Non
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = """SELECT name, zonefile_hash, txid, block_height FROM zonefiles
WHERE block_height >= ? and block_height <= ?
ORDER BY inv_index LIMIT ? OFFSET ?;"""
args = (from_block, to_block, count, offset)
sql = 'SELECT name,zonefile_hash,txid,block_height,inv_index FROM zonefiles WHERE block_height >= ? AND block_height <= ?'
args = (from_block, to_block)
if name:
sql += ' AND name = ?'
args += (name,)
sql += 'ORDER BY inv_index LIMIT ? OFFSET ?;'
args += (count, offset)
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
@@ -729,12 +715,13 @@ def atlasdb_get_zonefiles_by_block( from_block, to_block, offset, count, con=Non
'zonefile_hash' : zfinfo['zonefile_hash'],
'block_height' : zfinfo['block_height'],
'txid' : zfinfo['txid'],
'inv_index': zfinfo['inv_index'],
})
return ret
def atlasdb_find_zonefile_by_txid( txid, con=None, path=None ):
def atlasdb_get_zonefile_by_txid( txid, con=None, path=None ):
"""
Look up a zonefile by txid
Returns {'zonefile_hash': ..., 'name': ..., etc.}
@@ -748,7 +735,6 @@ def atlasdb_find_zonefile_by_txid( txid, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
for zfinfo in res:
ret = {}
@@ -758,13 +744,78 @@ def atlasdb_find_zonefile_by_txid( txid, con=None, path=None ):
return ret
def atlasdb_get_zonefiles_by_name(name, max_index=None, con=None, path=None):
"""
Look up the sequence of zone file records by name, optionally up to a specific zonefile index
Returns [{'zonefile_hash': ..., 'txid': ..., 'inv_index': ..., 'block_height': ..., 'present': ..., 'tried_storage': ...}], in blockchain order
"""
ret = []
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = 'SELECT * FROM zonefiles WHERE name = ? ORDER BY inv_index'
args = (name,)
if max_index:
sql += ' AND inv_index <= ?'
args += (max_index,)
sql += ';'
cur = dbcon.cursor()
res = atlasdb_query_execute(cur, sql, args)
for zfinfo in res:
row = {}
row.update(zfinfo)
ret.append(row)
return ret
def atlasdb_get_zonefiles_by_hash(zonefile_hash, block_height=None, con=None, path=None):
"""
Find all instances of this zone file in the atlasdb.
Optionally filter on block height
Returns [{'name': ..., 'zonefile_hash': ..., 'txid': ..., 'inv_index': ..., 'block_height': ..., 'present': ..., 'tried_storage': ...}], in blockchain order
Returns None if the zone file is not in the db, or if block_height is set, return None if the zone file is not at this block height.
"""
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = 'SELECT * FROM zonefiles WHERE zonefile_hash = ?'
args = (zonefile_hash,)
if block_height:
sql += ' AND block_height = ?'
args += (block_height,)
sql += ' ORDER BY inv_index;'
cur = dbcon.cursor()
res = atlasdb_query_execute(cur, sql, args)
ret = []
for zfinfo in res:
row = {}
row.update(zfinfo)
ret.append(row)
if len(ret) == 0:
return None
return ret
def atlasdb_set_zonefile_present( zonefile_hash, present, con=None, path=None ):
"""
Mark a zonefile as present (i.e. we stored it).
Keep our in-RAM zonefile inventory coherent.
Return the previous state.
Returns True if the zone file was already present
Returns False if it was not
"""
global ZONEFILE_INV
global ZONEFILE_INV, ZONEFILE_INV_LOCK
was_present = None
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -779,20 +830,21 @@ def atlasdb_set_zonefile_present( zonefile_hash, present, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=dbcon, path=path )
inv_vec = None
if ZONEFILE_INV is None:
inv_vec = ""
else:
inv_vec = ZONEFILE_INV[:]
with ZONEFILE_INV_LOCK:
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=dbcon, path=path )
inv_vec = None
if ZONEFILE_INV is None:
inv_vec = ""
else:
inv_vec = ZONEFILE_INV[:]
# did we know about this?
was_present = atlas_inventory_test_zonefile_bits( inv_vec, zfbits )
# did we know about this?
was_present = atlas_inventory_test_zonefile_bits( inv_vec, zfbits )
# keep our inventory vector coherent.
ZONEFILE_INV = atlas_inventory_flip_zonefile_bits( inv_vec, zfbits, present )
# keep our inventory vector coherent.
ZONEFILE_INV = atlas_inventory_flip_zonefile_bits( inv_vec, zfbits, present )
return was_present
@@ -801,8 +853,6 @@ def atlasdb_set_zonefile_tried_storage( zonefile_hash, tried_storage, con=None,
"""
Make a note that we tried to get the zonefile from storage
"""
global ZONEFILE_INV
with AtlasDBOpen(con=con, path=path) as dbcon:
if tried_storage:
tried_storage = 1
@@ -840,13 +890,16 @@ def atlasdb_cache_zonefile_info( con=None, path=None ):
"""
Load up and cache our zonefile inventory
"""
global ZONEFILE_INV, NUM_ZONEFILES
global ZONEFILE_INV, NUM_ZONEFILES, ZONEFILE_INV_LOCK
inv = None
with ZONEFILE_INV_LOCK:
inv_len = atlasdb_zonefile_inv_length( con=con, path=path )
inv = atlas_make_zonefile_inventory( 0, inv_len, con=con, path=path )
inv_len = atlasdb_zonefile_inv_length( con=con, path=path )
inv = atlas_make_zonefile_inventory( 0, inv_len, con=con, path=path )
ZONEFILE_INV = inv
NUM_ZONEFILES = inv_len
ZONEFILE_INV = inv
NUM_ZONEFILES = inv_len
return inv
@@ -862,7 +915,6 @@ def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
# NOTE: zero-indexed
ret = []
@@ -876,12 +928,18 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir, validate=True,
"""
Queue all zonefile hashes in the BlockstackDB
to the zonefile queue
NOT THREAD SAFE
Returns the list of zonefile infos queued, and whether or not they are present.
"""
# populate zonefile queue
total = 0
if end_block is None:
end_block = db.lastblock+1
ret = [] # map zonefile hash to zfinfo
for block_height in range(start_block, end_block, 1):
# TODO: can we do this transactionally?
@@ -901,19 +959,31 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir, validate=True,
atlasdb_add_zonefile_info( name, zfhash, txid, present, tried_storage, block_height, con=con )
total += 1
ret.append({
'name': name,
'zonefile_hash': zfhash,
'txid': txid,
'block_height': block_height,
'present': present,
'tried_storage': tried_storage
})
log.debug("Queued %s zonefiles from %s-%s" % (total, start_block, db.lastblock))
return True
return ret
def atlasdb_sync_zonefiles( db, start_block, zonefile_dir, validate=True, end_block=None, path=None, con=None ):
"""
Synchronize atlas DB with name db
NOT THREAD SAFE
"""
ret = None
with AtlasDBOpen(con=con, path=path) as dbcon:
atlasdb_queue_zonefiles( dbcon, db, start_block, zonefile_dir, validate=validate, end_block=end_block )
ret = atlasdb_queue_zonefiles( dbcon, db, start_block, zonefile_dir, validate=validate, end_block=end_block )
atlasdb_cache_zonefile_info( con=dbcon )
return True
return ret
def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=None, path=None, ping_on_evict=True ):
@@ -965,7 +1035,6 @@ def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=N
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
old_hostports = []
for row in res:
@@ -1035,7 +1104,6 @@ def atlasdb_num_peers( con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
for row in res:
@@ -1069,7 +1137,7 @@ def atlasdb_get_random_peer( con=None, path=None ):
with AtlasDBOpen(con=con, path=path) as dbcon:
num_peers = atlasdb_num_peers( con=con )
num_peers = atlasdb_num_peers( con=con, path=path )
if num_peers is None or num_peers == 0:
# no peers
ret['peer_hostport'] = None
@@ -1082,7 +1150,6 @@ def atlasdb_get_random_peer( con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = {'peer_hostport': None}
for row in res:
@@ -1108,7 +1175,6 @@ def atlasdb_get_old_peers( now, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
rows = []
for row in res:
@@ -1150,7 +1216,6 @@ def atlasdb_load_peer_table( con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
# build it up
count = 0
@@ -1204,7 +1269,7 @@ def atlasdb_init( path, zonefile_dir, db, peer_seeds, peer_blacklist, validate=F
# load up peer table from the db
log.debug("Loading peer table")
peer_table = atlasdb_load_peer_table( con )
peer_table = atlasdb_load_peer_table( con=con, path=path )
# cache zonefile inventory and count
atlasdb_cache_zonefile_info( con=con )
@@ -1283,7 +1348,6 @@ def atlasdb_zonefile_inv_list( bit_offset, bit_length, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
for row in res:
@@ -1305,7 +1369,6 @@ def atlasdb_zonefile_inv_length( con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
for row in res:
@@ -1343,7 +1406,6 @@ def atlasdb_zonefile_find_missing( bit_offset, bit_count, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
for row in res:
@@ -1367,7 +1429,6 @@ def atlasdb_zonefile_find_present( bit_offset, bit_count, con=None, path=None ):
cur = dbcon.cursor()
res = atlasdb_query_execute( cur, sql, args )
dbcon.commit()
ret = []
for row in res:
@@ -1420,28 +1481,29 @@ def atlas_get_zonefile_inventory( offset=None, length=None ):
"""
Get the in-RAM zonefile inventory vector.
"""
global ZONEFILE_INV
global ZONEFILE_INV, ZONEFILE_INV_LOCK
try:
assert ZONEFILE_INV is not None
except AssertionError:
log.error("FATAL: zonefile inventory not loaded")
os.abort()
with ZONEFILE_INV_LOCK:
try:
assert ZONEFILE_INV is not None
except AssertionError:
log.error("FATAL: zonefile inventory not loaded")
os.abort()
if offset is None:
offset = 0
if offset is None:
offset = 0
if length is None:
length = len(ZONEFILE_INV) - offset
if length is None:
length = len(ZONEFILE_INV) - offset
if offset >= len(ZONEFILE_INV):
return ""
if offset >= len(ZONEFILE_INV):
return ""
if offset + length > len(ZONEFILE_INV):
length = len(ZONEFILE_INV) - offset
ret = ZONEFILE_INV[offset:offset+length]
return ret
if offset + length > len(ZONEFILE_INV):
length = len(ZONEFILE_INV) - offset
ret = ZONEFILE_INV[offset:offset+length]
return ret
def atlas_get_num_zonefiles():
@@ -2068,7 +2130,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
{
'zonefile hash': {
'names': [names],
'txid': last txid,
'txid': first txid that set it,
'indexes': [...],
'popularity': ...,
'peers': [...],
@@ -2115,6 +2177,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
'names': [],
'txid': zfinfo['txid'],
'indexes': [],
'block_heights': [],
'popularity': 0,
'peers': [],
'tried_storage': False
@@ -2136,6 +2199,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
ret[zfinfo['zonefile_hash']]['names'].append( zfinfo['name'] )
ret[zfinfo['zonefile_hash']]['indexes'].append( zfinfo['inv_index']-1 )
ret[zfinfo['zonefile_hash']]['block_heights'].append( zfinfo['block_height'] )
ret[zfinfo['zonefile_hash']]['popularity'] += popularity
ret[zfinfo['zonefile_hash']]['peers'] += peers
ret[zfinfo['zonefile_hash']]['tried_storage'] = zfinfo['tried_storage']
@@ -3120,9 +3184,23 @@ class AtlasZonefileCrawler( threading.Thread ):
self.zonefile_dir = zonefile_dir
self.last_storage_reset = time_now()
self.atlasdb_path = path
def set_store_zonefile_callback(self, cb):
self.store_zonefile_cb = cb
def store_zonefile_data( self, fetched_zfhash, txid, zonefile_data, peer_hostport, con, path ):
def set_zoenfile_present(zfhash, block_height, con=None, path=None):
"""
Set a zonefile as present, and if it was previously absent, inform the storage listener
"""
was_present = atlasdb_set_zonefile_present( zfhash, True, con=con, path=path )
# tell anyone who cares that we got this zone file, if it was new
if not was_present and self.store_zonefile_db:
self.store_zonefile_cb(fetched_zfhash, block_height)
def store_zonefile_data( self, fetched_zfhash, zonefile_data, min_block_height, peer_hostport, con, path ):
"""
Store the fetched zonefile (as a serialized string) to storage and cache it locally.
Update internal state to mark it present
@@ -3138,12 +3216,12 @@ class AtlasZonefileCrawler( threading.Thread ):
log.debug("%s: got %s from %s" % (self.hostport, fetched_zfhash, peer_hostport))
# update internal state
atlasdb_set_zonefile_present( fetched_zfhash, True, con=con, path=path )
self.set_zonefile_present(fetched_zfhash, min_block_height, con=con, path=path)
return rc
def store_zonefiles( self, zonefile_names, zonefiles, zonefile_txids, peer_zonefile_hashes, peer_hostport, path, con=None ):
def store_zonefiles( self, zonefile_names, zonefiles, zonefile_txids, zonefile_block_heights, peer_zonefile_hashes, peer_hostport, path, con=None ):
"""
Store a list of RPC-fetched zonefiles (but only ones in peer_zonefile_hashes) from the given peer_hostport
Return the list of zonefile hashes stored.
@@ -3154,31 +3232,12 @@ class AtlasZonefileCrawler( threading.Thread ):
for fetched_zfhash, zonefile_txt in zonefiles.items():
if fetched_zfhash not in peer_zonefile_hashes:
if fetched_zfhash not in peer_zonefile_hashes or fetched_zfhash not in zonefile_block_heights:
# unsolicited
log.warn("%s: Unsolicited zonefile %s" % (self.hostport, fetched_zfhash))
continue
zfnames = zonefile_names.get(fetched_zfhash, None)
if zfnames is None:
# unsolicited
log.warn("%s: Unknown zonefile %s" % (self.hostport, fetched_zfhash))
continue
zftxid = zonefile_txids.get(fetched_zfhash, None)
if zftxid is None:
# not paid for
log.warn("%s: Unpaid zonefile %s" % (self.hostport, fetched_zfhash))
continue
# pick a name
zfinfo = atlasdb_find_zonefile_by_txid( zftxid, path=path, con=dbcon )
if zfinfo is None:
# don't know about this txid
log.warn("%s: Unknown txid %s for %s" % (self.hostport, zftxid, fetched_zfhash))
continue
rc = self.store_zonefile_data( fetched_zfhash, zftxid, zonefile_txt, peer_hostport, dbcon, path )
rc = self.store_zonefile_data( fetched_zfhash, zonefile_txt, min(zonefile_block_heights[fetched_zfdata]), peer_hostport, dbcon, path )
if rc:
# don't ask for it again
ret.append( fetched_zfhash )
@@ -3239,6 +3298,7 @@ class AtlasZonefileCrawler( threading.Thread ):
zonefile_hashes = list(set([zfhash for (_, zfhash) in zonefile_ranking]))
zonefile_names = dict([(zfhash, missing_zfinfo[zfhash]['names']) for zfhash in zonefile_hashes])
zonefile_txids = dict([(zfhash, missing_zfinfo[zfhash]['txid']) for zfhash in zonefile_hashes])
zonefile_block_heights = dict([(zfhash, missing_zfinfo[zfhash]['block_heights']) for zfhash in zonefile_hashes])
zonefile_origins = self.find_zonefile_origins( missing_zfinfo, peer_hostports )
# filter out the ones that are already cached
@@ -3251,8 +3311,7 @@ class AtlasZonefileCrawler( threading.Thread ):
zonefile_hashes[i] = None
# mark it as present
res = atlasdb_set_zonefile_present( zfhash, True, path=path )
self.set_zonefile_present(zfhash, min(zonefile_block_heights[zfhash]), path=path)
zonefile_hashes = filter( lambda zfh: zfh is not None, zonefile_hashes )
@@ -3265,8 +3324,9 @@ class AtlasZonefileCrawler( threading.Thread ):
zfnames = zonefile_names[zfhash]
zftxid = zonefile_txids[zfhash]
peers = missing_zfinfo[zfhash]['peers']
zfinfo = atlasdb_find_zonefile_by_txid( zftxid, path=path )
'''
zfinfo = atlasdb_get_zonefile_by_txid( zftxid, path=path )
if zfinfo is None:
# not known to us
log.warn("%s: unknown zonefile %s" % (self.hostport, zfhash))
@@ -3276,9 +3336,9 @@ class AtlasZonefileCrawler( threading.Thread ):
# unavailable
if not missing_zfinfo[zfhash]['tried_storage']:
log.debug("%s: zonefile %s is unavailable" % (self.hostport, zfhash))
zonefile_hashes.pop(0)
continue
'''
# try this zonefile's hosts in order by perceived availability
peers = atlas_rank_peers_by_health( peer_list=peers, with_zero_requests=True )
@@ -3309,7 +3369,7 @@ class AtlasZonefileCrawler( threading.Thread ):
if zonefiles is not None:
# got zonefiles!
stored_zfhashes = self.store_zonefiles( zonefile_names, zonefiles, zonefile_txids, peer_zonefile_hashes, peer_hostport, path )
stored_zfhashes = self.store_zonefiles( zonefile_names, zonefiles, zonefile_txids, zonefile_block_heights, peer_zonefile_hashes, peer_hostport, path )
# don't ask again
log.debug("Stored %s zonefiles" % len(stored_zfhashes))
@@ -3365,7 +3425,7 @@ class AtlasZonefileCrawler( threading.Thread ):
# re-try storage periodically for missing zonefiles
if self.last_storage_reset + PEER_CRAWL_ZONEFILE_STORAGE_RETRY_INTERVAL < time_now():
log.debug("%s: Re-trying storage on missing zonefiles" % self.hostport)
atlasdb_reset_zonefile_tried_storage()
atlasdb_reset_zonefile_tried_storage(path=self.atlasdb_path)
self.last_storage_reset = time_now()
@@ -3467,8 +3527,7 @@ class AtlasZonefilePusher(threading.Thread):
self.running = False
def atlas_node_start(my_hostname, my_portnum, atlasdb_path, zonefile_dir, working_dir):
def atlas_node_init(my_hostname, my_portnum, atlasdb_path, zonefile_dir, working_dir):
"""
Start up the atlas node.
Return a bundle of atlas state
@@ -3479,12 +3538,26 @@ def atlas_node_start(my_hostname, my_portnum, atlasdb_path, zonefile_dir, workin
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler(my_hostname, my_portnum, atlasdb_path, zonefile_dir)
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher(my_hostname, my_portnum, atlasdb_path, zonefile_dir)
# start them all up
return atlas_state
def atlas_node_start(atlas_state):
"""
Start up atlas threads
"""
for component in atlas_state.keys():
log.debug("Starting Atlas component '%s'" % component)
atlas_state[component].start()
return atlas_state
def atlas_node_add_callback(atlas_state, callback_name, callback):
"""
Add a callback to the initialized atlas state
"""
if callback_name == 'store_zonefile':
atlas_state['zonefile_crawler'].set_store_zonefile_callback(callback)
else:
raise ValueError("Unrecognized callback {}".format(callback_name))
def atlas_node_stop( atlas_state ):