more bugfixes from mesh testing

This commit is contained in:
Jude Nelson
2016-08-30 00:37:02 -04:00
parent 940d29e14e
commit 7edcafa753

View File

@@ -41,7 +41,7 @@ import hashlib
import blockstack_zones
import virtualchain
from blockstack_client.config import url_to_host_port, semver_match
from blockstack_client.config import url_to_host_port, semver_match, semver_newer
log = virtualchain.get_logger("blockstack-server")
@@ -163,6 +163,7 @@ PEER_TABLE_LOCK = threading.Lock()
PEER_QUEUE_LOCK = threading.Lock()
PEER_TABLE_LOCK_HOLDER = None
ZONEFILE_QUEUE_LOCK = threading.Lock()
DB_LOCK = threading.Lock()
def atlas_peer_table_lock():
"""
@@ -360,8 +361,15 @@ def atlasdb_query_execute( cur, query, values ):
DO NOT CALL THIS DIRECTLY.
"""
# under heavy contention, this can cause timeouts (which is unacceptable)
# serialize access to the db juts to be safe
global DB_LOCK
try:
DB_LOCK.acquire()
ret = cur.execute( query, values )
DB_LOCK.release()
return ret
except Exception, e:
log.exception(e)
@@ -543,7 +551,7 @@ def atlasdb_cache_zonefile_info( con=None, path=None ):
global ZONEFILE_INV, NUM_ZONEFILES
inv_len = atlasdb_zonefile_inv_length( con=con, path=path )
inv = atlasdb_make_zonefile_inv( 0, inv_len )
inv = atlas_make_zonefile_inventory( 0, inv_len, con=con, path=path )
ZONEFILE_INV = inv
NUM_ZONEFILES = inv_len
@@ -580,7 +588,7 @@ def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
return ret
def atlasdb_zonefile_block_reset( con, db, drop_block ):
def atlasdb_zonefile_block_reset( con, db, drop_block, path=None ):
"""
Drop all zonefile rows for a particular block.
Used for queueing zonefiles at a particular start block
@@ -616,7 +624,7 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
"""
# populate zonefile queue
total = 0
for block_height in xrange(start_block, db.lastblock+1, 1 ):
for block_height in xrange(start_block, db.lastblock+1, 1):
zonefile_hashes = db.get_value_hashes_at( block_height )
for zfhash in zonefile_hashes:
@@ -977,7 +985,7 @@ def atlasdb_load_peer_table( peer_hostport, con=None, path=None ):
args = ()
cur = con.cursor()
res = atlasdb_query_commit( cur, sql, args )
res = atlasdb_query_execute( cur, sql, args )
con.commit()
# build it up
@@ -1017,9 +1025,13 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
log.debug("Atlas DB exists at %s" % path)
atlasdb_last_block = atlasdb_get_lastblock( path=path )
log.debug("Synchronize zonefiles from %s to %s" % (atlasdb_last_block, db.last_block) )
if atlasdb_last_block is None:
atlasdb_last_block = 0
atlasdb_zonefile_block_reset( con, db, atlasdb_last_block )
log.debug("Synchronize zonefiles from %s to %s" % (atlasdb_last_block, db.lastblock) )
con = sqlite3.connect( path, isolation_level=None )
atlasdb_zonefile_block_reset( con, db, atlasdb_last_block, path=path )
atlasdb_queue_zonefiles( con, db, atlasdb_last_block, validate=validate, zonefile_dir=zonefile_dir )
log.debug("Refreshing seed peers")
@@ -1146,6 +1158,14 @@ def atlasdb_zonefile_inv_length( con=None, path=None ):
ret = []
for row in res:
try:
if row[0] is None:
ret.append( {'MAX(inv_index)': 0} )
break
except:
pass
tmp = {}
tmp.update(row)
ret.append(tmp)
@@ -1324,10 +1344,15 @@ def atlas_peer_getinfo( peer_hostport, timeout=None, peer_table=None ):
res = rpc.getinfo()
assert type(res) in [dict], 'Did not receive a dict'
for req_field in ['consensus', 'server_version', 'last_block_processed']:
assert req_field in res.keys(), "Missing field '%s'" % req_field
assert type(res[req_field]) in [str, unicode], "Not a string: '%s'" % req_field
res[req_field] = str(res[req_field])
if 'error' not in res:
for req_field in ['consensus', 'server_version', 'last_block_processed']:
assert req_field in res.keys(), "Missing field '%s'" % req_field
assert type(res[req_field]) in [str, unicode, int], "Not a string or int: '%s'" % req_field
res[req_field] = str(res[req_field])
else:
log.error("Failed to getinfo on %s: %s" % (peer_hostport, res['error']))
res = None
except AssertionError, ae:
log.exception(ae)
@@ -1350,7 +1375,7 @@ def atlas_peer_getinfo( peer_hostport, timeout=None, peer_table=None ):
atlas_peer_table_unlock()
peer_table = None
return ret
return res
@@ -1523,12 +1548,20 @@ def atlas_peer_get_request_count( peer_hostport, peer_table=None ):
def atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=None ):
"""
What's the zonefile inventory vector for this peer?
Return None if not defined
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
inv = peer_table[peer_hostport]['zonefile_inv']
if locked:
@@ -1547,6 +1580,13 @@ def atlas_peer_set_zonefile_inventory( peer_hostport, peer_inv, peer_table=None
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
peer_table[peer_hostport]['zonefile_inv'] = peer_inv
if locked:
@@ -1565,13 +1605,20 @@ def atlas_peer_is_blacklisted( peer_hostport, peer_table=None ):
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
ret = peer_table[peer_hostport].get("blacklisted", False)
if locked:
atlas_peer_table_unlock()
peer_table = None
return inv
return ret
def atlas_peer_is_whitelisted( peer_hostport, peer_table=None ):
@@ -1583,6 +1630,13 @@ def atlas_peer_is_whitelisted( peer_hostport, peer_table=None ):
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
ret = peer_table[peer_hostport].get("whitelisted", False)
if locked:
@@ -1614,14 +1668,6 @@ def atlas_peer_update_health( peer_hostport, received_response, peer_table=None
return False
# if blacklisted or whitelisted, then we don't care
if atlas_peer_is_whitelisted( peer_hostport, peer_table=peer_table ) or atlas_peer_is_blacklisted( peer_hostport, peer_table=peer_table ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return True
# record that we contacted this peer, and whether or not we useful info from it
now = time_now()
@@ -1764,6 +1810,13 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
peer_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
bit_offset = (len(peer_inv) - 1) * 8 # i.e. re-obtain the last byte
@@ -1782,6 +1835,13 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
if locked:
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return None
atlas_peer_set_zonefile_inventory( peer_hostport, peer_inv, peer_table=peer_table ) # NOTE: may have trailing 0's for padding
if locked:
@@ -1822,6 +1882,13 @@ def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offs
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return False
# reset the peer's zonefile inventory, back to offset
cur_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
atlas_peer_set_zonefile_inventory( peer_hostport, cur_inv[:byte_offset], peer_table=peer_table )
@@ -1837,6 +1904,13 @@ def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offs
if locked:
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return False
peer_table[peer_hostport]['zonefile_inventory_last_refresh'] = time_now()
if locked:
@@ -1867,6 +1941,13 @@ def atlas_peer_has_fresh_zonefile_inventory( peer_hostport, local_inv=None, peer
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return False
expected_length = len(local_inv)
fresh = False
@@ -2008,7 +2089,7 @@ def atlas_peer_has_zonefile( peer_hostport, zonefile_hash, zonefile_bits=None, c
Return True if present
Return False if not present
Return None if we don't know about the zonefile ourselves
Return None if we don't know about the zonefile ourselves, or if we don't know about the peer
"""
bits = None
@@ -2025,6 +2106,13 @@ def atlas_peer_has_zonefile( peer_hostport, zonefile_hash, zonefile_bits=None, c
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
if locked:
atlas_peer_table_unlock()
peer_table = None
return False
zonefile_inv = atlas_peer_get_zonefile_inventory( peer_hostport, peer_table=peer_table )
if locked:
@@ -2071,7 +2159,7 @@ def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=None, peer_tab
# sane limits
max_neighbors = atlas_max_neighbors()
assert len(peer_list['peers']) <= max_neighbors, "Invalid response with too many peers"
assert len(peer_list['peers']) <= max_neighbors, "Invalid response with too many peers (%s, expected <= %s)" % (len(peer_list['peers']), max_neighbors)
else:
assert type(peer_list['error']) in [str, unicode], "Invalid error message"
@@ -2429,9 +2517,8 @@ def atlas_zonefile_push( my_hostport, peer_hostport, zonefile_dict, timeout=None
assert type(push_info) in [dict], "Invalid push response"
if 'status' in push_info and push_info['status']:
assert 'saved' in push_info, "Missing saved vector"
assert type(push_info) == list, "Invalid saved vector"
assert type(push_info['saved']) == list, "Invalid saved vector"
assert len(push_info['saved']) == 1, "Invalid saved vector"
assert type(push_info['saved']) in [int, long], "Invalid saved vector"
assert push_info['saved'][0] in [0, 1], "Invalid saved vector value"
if push_info['saved'] == 1:
@@ -2539,11 +2626,11 @@ class AtlasPeerCrawler( threading.Thread ):
def add_new_peers( self, count, new_peers, current_peers, con=None, path=None, peer_table=None ):
"""
Ping up to @count new peers from new_peers
Ping up to @count new peers from @new_peers
that aren't already known to us. If they
respond, then add them to the peer set.
Return the list of peers added
Return the list of peers added and the list of peers already known
"""
if self.ping_timeout is None:
@@ -2553,6 +2640,7 @@ class AtlasPeerCrawler( threading.Thread ):
cnt = 0
i = 0
added = []
present = []
while i < len(new_peers) and cnt < min(count, len(new_peers)):
peer = new_peers[i]
i += 1
@@ -2561,6 +2649,8 @@ class AtlasPeerCrawler( threading.Thread ):
continue
if peer in current_peers:
log.debug("%s is already known" % peer)
present.append(peer)
continue
cnt += 1
@@ -2573,7 +2663,11 @@ class AtlasPeerCrawler( threading.Thread ):
# didn't respond
continue
if semver_newer( MIN_ATLAS_VERSION, res['version'] ):
if not res.has_key('server_version'):
# too old
continue
if semver_newer( MIN_ATLAS_VERSION, res['server_version'] ):
# too old to be an atlas node
log.debug("%s is too old to be an atlas node (version %s)" % (peer, res['version']))
continue
@@ -2581,12 +2675,13 @@ class AtlasPeerCrawler( threading.Thread ):
# TODO: check consensus hash as well
if res:
log.debug("Add newly-discovered peer %s" % peer)
atlasdb_add_peer( peer, con=con, path=path, peer_table=peer_table )
added.append(peer)
return added
return added, present
def remove_unhealthy_peers( self, count, con=None, path=None, peer_table=None, min_request_count=10, min_health=MIN_PEER_HEALTH ):
@@ -2603,7 +2698,7 @@ class AtlasPeerCrawler( threading.Thread ):
removed = []
rank_peer_list = atlas_rank_peers_by_health( peer_table=peer_table, with_rank=True )
for rank, peer in rank_peer_list:
reqcount = atlas_peer_get_request_count( peer, peer_hostport )
reqcount = atlas_peer_get_request_count( peer, peer_table=peer_table )
if reqcount >= min_request_count and rank < min_health:
removed.append( peer )
if len(removed) >= count:
@@ -2740,11 +2835,15 @@ class AtlasPeerCrawler( threading.Thread ):
new_peers.remove(self.my_hostport)
# only handle a few peers for now
added = self.add_new_peers( num_new_peers, new_peers, current_peers, con=con, path=path, peer_table=peer_table )
for peer in added:
log.debug("Add at most %s new peers out of %s options" % (num_new_peers, len(new_peers)))
added, present = self.add_new_peers( num_new_peers, new_peers, current_peers, con=con, path=path, peer_table=peer_table )
for peer in added + present:
if peer in new_peers:
new_peers.remove(peer)
if peer in self.new_peers:
self.new_peers.remove(peer)
# DDoS prevention: don't let this get too big
max_new_peers = atlas_max_new_peers( self.max_neighbors )
if len(new_peers) > max_new_peers:
@@ -3190,9 +3289,17 @@ class AtlasZonefilePusher(threading.Thread):
return 0
zfhash = zfinfo.keys()[0]
zfdata = zfinfo[zfhash]
zfdata_txt = zfinfo[zfhash]
zfdata = None
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=con, path=path )
try:
zfdata = blockstack_zones.parse_zone_file( zfdata_txt )
except Exception, e:
log.exception(e)
log.error("Failed to parse zonefile %s" % zfhash)
return 0
zfbits = atlasdb_get_zonefile_bits( zfhash, path=path )
if len(zfbits) == 0:
# nope
return 0
@@ -3218,7 +3325,7 @@ class AtlasZonefilePusher(threading.Thread):
ret = 0
for peer in peers:
log.debug("%s: Push to %s" % (self.hostport, peer))
atlas_zonefile_push( peer, zfdata, timeout=self.push_timeout )
atlas_zonefile_push( self.hostport, peer, zfdata, timeout=self.push_timeout )
ret += 1
return ret