bugfixes found in testing; add top-level startup/shutdown logic; add db

sync logic
This commit is contained in:
Jude Nelson
2016-08-24 17:41:53 -04:00
parent 848cfff660
commit d23bdf3c16

View File

@@ -40,6 +40,9 @@ import hashlib
import blockstack_zones
import virtualchain
from blockstack_client.config import url_to_host_port
log = virtualchain.get_logger("blockstack-server")
from lib.config import *
@@ -76,7 +79,7 @@ if os.environ.get("BLOCKSTACK_ATLAS_NUM_NEIGHBORS") is not None:
if os.environ.get("BLOCKSTACK_ATLAS_MAX_NEIGHBORS") is not None:
NUM_NEIGHBORS = int(os.environ.get("BLOCKSTACK_ATLAS_MAX_NEIGHBORS"))
if os.environ.get("BLOCKSTACK_TEST", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_UNIT_TEST", None) is None:
if os.environ.get("BLOCKSTACK_TEST", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1":
# use test client
from blockstack_integration_tests import AtlasRPCTestClient as BlockstackRPCClient
from blockstack_integration_tests import time_now, time_sleep, atlas_max_neighbors, atlas_peer_lifetime_interval, atlas_peer_ping_interval, atlas_peer_max_age, atlas_peer_clean_interval
@@ -109,7 +112,7 @@ else:
ATLASDB_SQL = """
CREATE TABLE zonefiles( inv_index INTEGER PRIMARY KEY AUTOINCREMENT,
zonefile_hash STRING NOT NULL,
zonefile_hash TEXT NOT NULL,
present INTEGER NOT NULL,
block_height INTEGER NOT NULL );
@@ -380,6 +383,38 @@ def atlasdb_add_zonefile_info( zonefile_hash, present, block_height, con=None, p
return True
def atlasdb_get_lastblock( con=None, path=None ):
"""
Get the highest block height in the atlas db
"""
if path is None:
path = atlasdb_path()
close = False
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
sql = "SELECT MAX(block_height) FROM zonefiles;"
args = ()
cur = con.cursor()
res = atlasdb_query_execute( cur, sql, args )
con.commit()
row = {}
for r in res:
row.update(r)
break
if close:
con.close()
return row['MAX(block_height)']
def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
"""
Look up all information on this zonefile.
@@ -405,13 +440,13 @@ def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
'zonefile_hash': zonefile_hash,
'indexes': [],
'block_heights': [],
'present': None
'present': False
}
for zfinfo in res:
ret['indexes'].append( zfinfo['inv_index'] )
ret['block_heights'].append( zfinfo['block_height'] )
ret['present'] = zfinfo['present']
ret['present'] = ret['present'] or zfinfo['present']
if close:
con.close()
@@ -492,6 +527,35 @@ def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
return ret
def atlasdb_zonefile_block_reset( con, db, drop_block ):
"""
Drop all zonefile rows for a particular block.
Used for queueing zonefiles at a particular start block
(i.e. we don't want to deal with "partial" blocks represented
in the db).
"""
if path is None:
path = atlasdb_path()
close = False
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
sql = "DELETE FROM zonefiles WHERE block_height = ?;"
args = (drop_block,)
cur = con.cursor()
res = atlasdb_query_execute( cur, sql, args )
con.commit()
if close:
con.close()
return True
def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=True ):
"""
Queue all zonefile hashes in the BlockstackDB
@@ -503,7 +567,10 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
zonefile_hashes = db.get_value_hashes_at( block_height )
for zfhash in zonefile_hashes:
zfhash = str(zfhash)
present = is_zonefile_cached( zfhash, zonefile_dir=zonefile_dir, validate=validate )
log.debug("Add %s at %s (present: %s)" % (zfhash, block_height, present) )
atlasdb_add_zonefile_info( zfhash, present, block_height, con=con )
total += 1
@@ -511,7 +578,28 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
return True
def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=None, path=None ):
def atlasdb_sync_zonefiles( db, start_block, zonefile_dir=None, validate=True, path=None, con=None ):
"""
Synchronize atlas DB with name db
"""
if path is None:
path = atlasdb_path()
close = False
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=zonefile_dir, validate=validate )
if close:
con.close()
return True
def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=None, path=None, ping_on_evict=True ):
"""
Add a peer to the peer table.
If the peer conflicts with another peer, ping it first, and only insert
@@ -524,8 +612,13 @@ def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=N
"""
# bound the number of peers we add to PEER_MAX_DB
assert len(peer_hostport) > 0
sk = random.randint(0, 2**32)
peer_host, peer_port = url_to_host_port( peer_hostport )
assert len(peer_host) > 0
peer_slot = int( hashlib.sha256("%s%s" % (sk, peer_host)).hexdigest(), 16 ) % PEER_MAX_DB
locked = False
@@ -557,33 +650,37 @@ def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=N
discovery_time = int(time.time())
# not in the table yet. See if we can evict someone
sql = "SELECT peer_hostport FROM peers WHERE peer_slot = ?;"
args = (peer_slot,)
if ping_on_evict:
cur = con.cursor()
res = atlasdb_query_execute( cur, sql, args )
con.commit()
sql = "SELECT peer_hostport FROM peers WHERE peer_slot = ?;"
args = (peer_slot,)
old_hostports = []
for row in res:
old_hostport = res['peer_hostport']
old_hostports.append( old_hostport )
cur = con.cursor()
res = atlasdb_query_execute( cur, sql, args )
con.commit()
for old_hostport in old_hostports:
# is this other peer still alive?
res = atlas_peer_ping( old_hostport )
if res:
log.debug("Peer %s is still alive; will not replace" % (old_hostport))
if close:
con.close()
old_hostports = []
for row in res:
old_hostport = res['peer_hostport']
old_hostports.append( old_hostport )
if locked:
atlas_peer_table_unlock()
for old_hostport in old_hostports:
# is this other peer still alive?
res = atlas_peer_ping( old_hostport )
if res:
log.debug("Peer %s is still alive; will not replace" % (old_hostport))
if close:
con.close()
return False
if locked:
atlas_peer_table_unlock()
# peer is dead. Can insert or update
return False
log.debug("Add peer '%s' discovered at %s (slot %s)" % (peer_hostport, discovery_time, peer_slot))
# peer is dead (or we don't care). Can insert or update
sql = "INSERT OR REPLACE INTO peers (peer_hostport, peer_slot, discovery_time) VALUES (?,?,?);"
args = (peer_hostport, peer_slot, discovery_time)
@@ -595,10 +692,11 @@ def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=N
con.close()
# add to peer table as well
atlas_init_peer_info( peer_table, peer_hostport, False )
atlas_init_peer_info( peer_table, peer_hostport, blacklisted=False, whitelisted=False )
if locked:
atlas_peer_table_unlock()
peer_table = None
return True
@@ -617,7 +715,7 @@ def atlasdb_remove_peer( peer_hostport, con=None, path=None, peer_table=None ):
# nothing to do
if locked:
atlas_peer_table_unlock()
locked = False
peer_table = None
return True
@@ -641,9 +739,9 @@ def atlasdb_remove_peer( peer_hostport, con=None, path=None, peer_table=None ):
if close:
con.close()
# remove from the peer table as well
# remove from the peer table as well, unless blacklisted or whitelisted
if peer_table.has_key(peer_hostport):
if peer_table[peer_hostport].get("blacklisted", False):
if not atlas_peer_is_whitelisted( peer_hostport, peer_table=peer_table ) and not atlas_peer_is_blacklisted( peer_hostport, peer_table=peer_table ):
log.debug("Forget peer '%s'" % dead_peers)
del peer_table[peer_hostport]
@@ -688,6 +786,24 @@ def atlasdb_num_peers( con=None, path=None ):
return ret[0]['MAX(peer_index)']
def atlas_get_peer( peer_hostport, peer_table=None ):
"""
Get the given peer's info
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
ret = peer_table.get(peer_hostport, None)
if locked:
atlas_peer_table_unlock()
peer_table = None
return ret
def atlasdb_get_random_peer( con=None, path=None ):
"""
Select a peer from the db at random
@@ -732,7 +848,7 @@ def atlasdb_get_random_peer( con=None, path=None ):
def atlasdb_get_old_peers( now, con=None, path=None ):
"""
Get peers older than now - LIFETIME
Get peers older than now - PEER_LIFETIME
"""
if path is None:
path = atlasdb_path()
@@ -801,9 +917,49 @@ def atlasdb_delete_peer( peer_hostport, con=None, path=None, peer_table=None ):
atlas_peer_table_unlock()
peer_table = None
if closed:
con.close()
return
def atlasdb_load_peer_table( peer_hostport, con=None, path=None ):
"""
Create a peer table from the peer DB
"""
peer_table = {}
if path is None:
path = atlasdb_path()
close = False
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
sql = "SELECT * FROM peers;"
args = ()
cur = con.cursor()
res = atlasdb_query_commit( cur, sql, args )
con.commit()
# build it up
count = 0
for row in res:
if count > 0 and count % 100 == 0:
log.debug("Loaded %s peers..." % count)
atlas_init_peer_info( peer_table, row['peer_hostport'] )
count += 1
if close:
con.close()
return peer_table
def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile_dir=None ):
"""
Set up the atlas node:
@@ -824,9 +980,23 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
if os.path.exists( path ):
log.debug("Atlas DB exists at %s" % path)
atlasdb_last_block = atlasdb_get_lastblock( path=path )
log.debug("Synchronize zonefiles from %s to %s" % (atlasdb_last_block, db.last_block) )
# TODO: sync up to lastblock
# TODO: load peers into peer_table
atlasdb_zonefile_block_reset( con, db, atlasdb_last_block )
atlasdb_queue_zonefiles( con, db, atlasdb_last_block, validate=validate, zonefile_dir=zonefile_dir )
log.debug("Refreshing seed peers")
for peer in peer_seeds:
# forcibly add seed peers
atlasdb_add_peer( peer, con=con, peer_table=peer_table, ping_on_evict=False )
# load up peer table from the db
log.debug("Loading peer table")
peer_table = atlasdb_load_peer_table( con )
con.close()
else:
@@ -850,51 +1020,28 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
con.close()
# add initial peer info
for peer_url in peer_seeds + peer_blacklist:
# whitelist and blacklist
for peer_url in peer_seeds:
host, port = url_to_host_port( peer_url )
peer_hostport = "%s:%s" % (host, port)
if peer_hostport not in peer_table.keys():
atlasdb_add_peer( peer_hostport, path=path, peer_table=peer_table )
peer_table[peer_hostport]['blacklisted'] = (peer_url in peer_blacklist)
peer_table[peer_hostport]['whitelisted'] = True
for peer_url in peer_blacklist:
host, port = url_to_host_port( peer_url )
peer_hostport = "%s:%s" % (host, port)
if peer_hostport not in peer_table.keys():
atlasdb_add_peer( peer_hostport, path=path, peer_table=peer_table )
peer_table[peer_hostport]['blacklisted'] = True
return peer_table
def atlasdb_zonefile_info_list( start, end, con=None, path=None ):
"""
Get a listing of zonefile information
for a given blockchain range [start, end].
"""
if path is None:
path = atlasdb_path()
close = False
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
sql = "SELECT * FROM zonefiles WHERE block_height >= ? AND block_height <= ?;"
args = (start, end)
cur = con.cursor()
res = atlasdb_query_execute( cur, sql, args )
con.commit()
ret = []
for row in res:
tmp = {}
tmp.update(row)
ret.append(tmp)
if close:
con.close()
return ret
def atlasdb_zonefile_inv_list( bit_offset, bit_length, con=None, path=None ):
"""
Get an inventory listing.
@@ -1037,31 +1184,6 @@ def atlas_make_zonefile_inventory( bit_offset, bit_length, con=None, path=None )
return inv
def atlas_inventory_find_missing( bit_offset, bit_count, zonefile_inv=None ):
"""
Find the missing zonefile bit indexes.
Use the global zonefile inventory vector by default,
or optionally the given zonefile_inv.
"""
if zonefile_inv is None:
zonefile_inv = atlas_get_zonefile_inventory()
bits = []
for i in xrange(bit_offset, bit_offset+bit_count):
byte_offset = i / 8
bit_index = 7 - (i % 8)
if byte_offset >= len(zonefile_inv):
# beyond the length of this inv
bits.append(i)
else:
is_set = ord(zonefile_inv[byte_index]) & (1 << bit_index)
if is_set == 0:
# not set
bits.append(i)
return bits
def atlas_get_zonefile_inventory():
"""
Get the in-RAM zonefile inventory vector.
@@ -1070,48 +1192,18 @@ def atlas_get_zonefile_inventory():
return ZONEFILE_INV
def atlas_init_peer_info( peer_table, peer_hostport, blacklist=False ):
def atlas_init_peer_info( peer_table, peer_hostport, blacklisted=False, whitelisted=False ):
"""
Initialize peer info table entry
"""
peer_table[peer_hostport] = {
"time": [],
"zonefile_inv": "",
"blacklist": blacklist
"blacklisted": blacklisted,
"whitelisted": whitelisted
}
def url_to_host_port( url, port=RPC_SERVER_PORT ):
"""
Given a URL, turn it into (host, port).
Return (None, None) on invalid URL
"""
if not url.startswith("http://") or not url.startswith("https://"):
url = "http://" + url
urlinfo = urllib2.urlparse.urlparse(url)
hostport = urlinfo.netloc
parts = hostport.split("@")
if len(parts) > 2:
return (None, None)
if len(parts) == 2:
hostport = parts[1]
parts = hostport.split(":")
if len(parts) > 2:
return (None, None)
if len(parts) == 2:
try:
port = int(parts[1])
except:
return (None, None)
return parts[0], port
def atlas_peer_ping( peer_hostport, timeout=3, peer_table=None ):
"""
Ping a host
@@ -1148,17 +1240,6 @@ def atlas_peer_ping( peer_hostport, timeout=3, peer_table=None ):
return ret
def atlas_peer_is_live( peer_hostport, peer_table, min_health=MIN_PEER_HEALTH ):
"""
Have we heard from this node recently?
"""
if not peer_table.has_key(peer_hostport):
return False
health_score = atlas_peer_get_health( peer_hostport, peer_table=peer_table )
return health_score > min_health and atlas_peer_get_request_count( peer_hostport, peer_table=peer_table ) > 0
def atlas_inventory_count_missing( inv1, inv2 ):
"""
Find out how many bits are set in inv2
@@ -1208,6 +1289,7 @@ def atlas_get_live_neighbors( remote_peer_hostport, peer_table=None, min_health=
if locked:
atlas_peer_table_unlock()
peer_table = None
random.shuffle(alive_peers)
return alive_peers
@@ -1217,13 +1299,13 @@ def atlas_remove_peers( dead_peers, peer_table ):
"""
Remove all peer information for the given dead peers from the given health info,
as well as from the db.
Only preserve unconditionally if we've blacklisted them
Only preserve unconditionally if we've blacklisted or whitelisted them them
explicitly.
"""
for peer_hostport in dead_peers:
if peer_table.has_key(peer_hostport):
if peer_table[peer_hostport].get("blacklisted", False):
if atlas_peer_is_whitelisted( peer_hostport, peer_table=peer_table ) or atlas_peer_is_blacklisted( peer_hostport, peer_table=peer_table ):
continue
log.debug("Forget peer '%s'" % dead_peers)
@@ -1274,6 +1356,7 @@ def atlas_peer_get_health( peer_hostport, peer_table=None ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return availability_score
@@ -1290,6 +1373,7 @@ def atlas_peer_get_request_count( peer_hostport, peer_table=None ):
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return 0
@@ -1300,6 +1384,7 @@ def atlas_peer_get_request_count( peer_hostport, peer_table=None ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return count
@@ -1317,6 +1402,61 @@ def atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=None ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return inv
def atlas_peer_set_zonefile_inventory( peer_hostport, peer_inv, peer_table=None ):
"""
Set this peer's zonefile inventory
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
peer_table[peer_hostport]['zonefile_inv'] = inv
if locked:
atlas_peer_table_unlock()
peer_table = None
return inv
def atlas_peer_is_blacklisted( peer_hostport, peer_table=None ):
"""
Is a peer blacklisted?
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
ret = peer_table[peer_hostport].get("blacklisted", False)
if locked:
atlas_peer_table_unlock()
peer_table = None
return inv
def atlas_peer_is_whitelisted( peer_hostport, peer_table=None ):
"""
Is a peer whitelisted
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
ret = peer_table[peer_hostport].get("whitelisted", False)
if locked:
atlas_peer_table_unlock()
peer_table = None
return inv
@@ -1343,10 +1483,11 @@ def atlas_peer_update_health( peer_hostport, received_response, peer_table=None
return False
# if blacklisted, then we don't care
if peer_table[peer_hostport].get("blacklisted", False):
# if blacklisted or whitelisted, then we don't care
if atlas_peer_is_whitelisted( peer_hostport, peer_table=peer_table ) or atlas_peer_is_blacklisted( peer_hostport, peer_table=peer_table ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return True
@@ -1366,6 +1507,7 @@ def atlas_peer_update_health( peer_hostport, received_response, peer_table=None
if locked:
atlas_peer_table_unlock()
peer_table = None
return True
@@ -1450,7 +1592,7 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
locked = True
peer_table = atlas_peer_table_lock()
peer_inv = peer_table[peer_hostport]['zonefile_inv']
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
bit_offset = (len(peer_inv) - 1) * 8 # i.e. re-obtain the last byte
if bit_offset < 0:
@@ -1461,9 +1603,14 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
if locked:
atlas_peer_table_unlock()
peer_table = None
if bit_offset >= maxlen:
# synced already
if locked:
atlas_peer_table_unlock()
peer_table = None
return peer_inv
for offset in xrange( bit_offset, maxlen, interval):
@@ -1481,10 +1628,11 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
if locked:
peer_table = atlas_peer_table_lock()
peer_table[peer_hostport]['zonefile_inv'] = peer_inv # NOTE: may have trailing 0's for padding
atlas_peer_set_zonefile_inventory( peer_hostport, peer_inv, peer_table=peer_table ) # NOTE: may have trailing 0's for padding
if locked:
atlas_peer_table_unlock()
peer_table = None
return peer_inv
@@ -1518,8 +1666,8 @@ def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offs
peer_table = atlas_peer_table_lock()
# reset the peer's zonefile inventory, back to offset
cur_inv = peer_table[peer_hostport]['zonefile_inv']
peer_table[peer_hostport]['zonefile_inv'] = cur_inv[:byte_offset]
cur_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
atlas_peer_set_zonefile_inventory( peer_hostport, cur_inv[:byte_offset], peer_table=peer_table )
if locked:
atlas_peer_table_unlock()
@@ -1536,6 +1684,7 @@ def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offs
if locked:
atlas_peer_table_unlock()
peer_table = None
log.debug("%s: inventory of %s is now %s" % (my_hostport, peer_hostport, atlas_inventory_to_string(inv)))
@@ -1565,7 +1714,8 @@ def atlas_peer_has_fresh_zonefile_inventory( peer_hostport, local_inv=None, peer
fresh = False
now = time_now()
if len(peer_table[peer_hostport]['zonefile_inv']) >= expected_length and \
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
if len(peer_inv) >= expected_length and \
peer_table[peer_hostport].has_key('zonefile_inventory_last_refresh') and \
peer_table[peer_hostport]['zonefile_inventory_last_refresh'] + atlas_peer_ping_interval() > now:
@@ -1573,6 +1723,7 @@ def atlas_peer_has_fresh_zonefile_inventory( peer_hostport, local_inv=None, peer
if locked:
atlas_peer_table_unlock()
peer_table = None
return fresh
@@ -1595,12 +1746,13 @@ def atlas_peer_set_zonefile_status( peer_hostport, zonefile_hash, present, zonef
peer_table = atlas_peer_table_lock()
if peer_table.has_key(peer_hostport):
peer_inv = peer_table[peer_hostport]['zonefile_inv']
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
peer_inv = atlas_inventory_flip_zonefile_bits( peer_inv, zonefile_bits, present )
peer_table[peer_hostport]['zonefile_inv'] = peer_inv
atlas_peer_set_zonefile_inventory( peer_hostport, peer_inv, peer_table=peer_table )
if locked:
atlas_peer_table_unlock()
peer_table = None
return
@@ -1668,11 +1820,12 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
}
for peer_hostport in peer_table.keys():
if len(peer_table[peer_hostport]['zonefile_inv']) <= byte_index:
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
if len(peer_inv) <= byte_index:
# too new for this peer
continue
if (ord(peer_table[peer_hostport]['zonefile_inv'][byte_index]) & (1 << bit_index)) == 0:
if (ord(peer_inv[byte_index]) & (1 << bit_index)) == 0:
# this peer doesn't have it
continue
@@ -1686,6 +1839,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
if locked:
atlas_peer_table_unlock()
peer_table = None
return ret
@@ -1714,7 +1868,7 @@ def atlas_peer_has_zonefile( peer_hostport, zonefile_hash, zonefile_bits=None, c
locked = True
peer_table = atlas_peer_table_lock()
zonefile_inv = peer_table[peer_hostport]['zonefile_inv']
zonefile_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
if locked:
atlas_peer_table_unlock()
@@ -1744,7 +1898,7 @@ def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=10, peer_table
rpc = BlockstackRPCClient( host, port, timeout=timeout, src=my_hostport )
try:
peer_list = rpc.get_atlas_peers( my_hostport )
peer_list = rpc.get_atlas_peers()
assert type(peer_list) in [dict], "Not a peer list response"
@@ -1873,6 +2027,7 @@ def atlas_rank_peers_by_health( peer_list=None, peer_table=None, with_zero_reque
if locked:
atlas_peer_table_unlock()
peer_table = None
# sort on health
peer_health_ranking.sort()
@@ -1909,15 +2064,19 @@ def atlas_rank_peers_by_data_availability( peer_list=None, peer_table=None, loca
peer_availability_ranking = [] # (health score, peer hostport)
for peer_hostport in peer_list:
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
# ignore peers that we don't have an inventory for
if len(peer_table[peer_hostport]['zonefile_inv']) == 0:
if len(peer_inv) == 0:
continue
availability_score = atlas_peer_get_availability( local_inv, peer_table[peer_hostport]['zonefile_inv'] )
availability_score = atlas_peer_get_availability( local_inv, peer_inv )
peer_availability_ranking.append( (availability_score, peer_hostport) )
if locked:
atlas_peer_table_unlock()
peer_table = None
# sort on availability
peer_availability_ranking.sort()
@@ -1973,6 +2132,7 @@ def atlas_peer_get_last_response_time( peer_hostport, peer_table=None ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return last_time
@@ -2053,7 +2213,7 @@ def atlas_zonefile_find_push_peers( zonefile_hash, peer_table=None, zonefile_bit
push_peers = []
for peer_hostport in peer_table.keys():
zonefile_inv = peer_table[peer_hostport]['zonefile_inv']
zonefile_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
res = atlas_inventory_test_zonefile_bits( zonefile_inv, zonefile_bits )
if res:
push_peers.append( peer_hostport )
@@ -2100,6 +2260,10 @@ def atlas_zonefile_push_enqueue( zonefile_hash, zonefile_data, peer_table=None,
if zonefile_queue_locked:
atlas_zonefile_queue_unlock()
if table_locked:
atlas_peer_table_unlock()
peer_table = None
return res
@@ -2160,7 +2324,7 @@ class AtlasPeerCrawler( threading.Thread ):
and we restart from a randomly-chosen peer in our peer DB.
"""
def __init__(self, my_hostname, my_portnum):
def __init__(self, my_hostname, my_portnum, path=None ):
threading.Thread.__init__(self)
self.running = False
self.last_clean_time = 0
@@ -2174,6 +2338,7 @@ class AtlasPeerCrawler( threading.Thread ):
self.new_peers = []
self.max_neighbors = None
self.atlasdb_path = path
def get_neighbors( self, peer_hostport, con=None, path=None, peer_table=None ):
@@ -2509,7 +2674,7 @@ class AtlasPeerCrawler( threading.Thread ):
def run(self):
self.running = True
while self.running:
self.step()
self.step( path=self.atlasdb_path )
def ask_join(self):
@@ -2531,7 +2696,8 @@ class AtlasHealthChecker( threading.Thread ):
self.last_clean_time = 0
if path is None:
path = atlasdb_path()
self.atlasdb_path = path
def step(self, con=None, path=None, peer_table=None, local_inv=None):
"""
@@ -2582,7 +2748,7 @@ class AtlasHealthChecker( threading.Thread ):
"""
while self.running:
local_inv = atlas_get_local_inventory()
self.step( peer_table=peer_table, local_inv=local_inv )
self.step( peer_table=peer_table, local_inv=local_inv, path=self.atlasdb_path )
def ask_join(self):
@@ -2756,19 +2922,27 @@ class AtlasZonefileCrawler( threading.Thread ):
time_sleep(self.hostport, self.__class__.__name__, 1.0)
def ask_join(self):
self.running = False
class AtlasZonefilePusher(object):
"""
Continuously drain the queue of zonefiles
we can push, by sending them off to
known peers who need them.
"""
def __init__(self, host, port ):
def __init__(self, host, port, path=None ):
self.host = host
self.port = port
self.hostport = "%s:%s" % (host, port)
self.path = path
if self.path is None:
self.path = atlasdb_path()
def step( self, peer_table=None, zonefile_queue=None ):
def step( self, peer_table=None, zonefile_queue=None, path=None ):
"""
Run one step of this algorithm.
Push the zonefile to all the peers that need it.
@@ -2816,8 +2990,42 @@ class AtlasZonefilePusher(object):
def run(self):
self.running = True
while self.running:
num_pushed = self.step()
num_pushed = self.step( path=self.path )
if num_pushed == 0:
time_sleep(self.hostport, self.__class__.__name__, 1.0)
def ask_join(self):
self.running = False
def atlas_node_start( my_hostname, my_portnum, atlasdb_path=None, zonefile_dir=None, zonefile_storage_drivers=[] ):
"""
Start up the atlas node.
Return a bundle of atlas state
"""
atlas_state = {}
atlas_state['peer_crawler'] = AtlasPeerCrawler( my_hostname, my_portnum )
atlas_state['health_checker'] = AtlasHealthChecker( my_hostname, my_portnum, path=atlasdb_path )
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler( my_hostname, my_portnum, zonefile_storage_drivers=zonefile_storage_drivers, path=atlasdb_path, zonefile_dir=zonefile_dir )
atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, path=atlasdb_path )
# start them all up
for component in atlas_state.keys():
log.debug("Starting Atlas component '%s'" % component)
atlas_state[component].start()
return atlas_state
def atlas_node_stop( atlas_state ):
"""
Stop the atlas node threads
"""
for component in atlas_state.keys():
log.debug("Stopping Atlas component '%s'" % component)
atlas_state[component].ask_join()
atlas_state[component].join()
return True