more bugfixes found during testing; remove dead code; fix deadlocks

This commit is contained in:
Jude Nelson
2016-08-26 18:45:52 -04:00
parent d999f1beb7
commit 28f01ac54d

View File

@@ -41,7 +41,7 @@ import hashlib
import blockstack_zones
import virtualchain
from blockstack_client.config import url_to_host_port
from blockstack_client.config import url_to_host_port, semver_match
log = virtualchain.get_logger("blockstack-server")
@@ -51,6 +51,8 @@ from lib import get_db_state
from pybloom_live import BloomFilter, ScalableBloomFilter
MIN_ATLAS_VERSION = "0.14.0"
PEER_LIFETIME_INTERVAL = 3600 # 1 hour
PEER_PING_INTERVAL = 60 # 1 minute
PEER_MAX_AGE = 2678400 # 1 month
@@ -58,9 +60,16 @@ PEER_CLEAN_INTERVAL = 3600 # 1 hour
PEER_MAX_DB = 65536 # maximum number of peers in the peer db
MIN_PEER_HEALTH = 0.5 # minimum peer health before we forget about it
PEER_PING_TIMEOUT = 3 # number of seconds for a ping to take
PEER_INV_TIMEOUT = 10 # number of seconds for an inv to take
PEER_NEIGHBORS_TIMEOUT = 10 # number of seconds for a neighbors query to take
PEER_ZONEFILES_TIMEOUT = 30 # number of seconds for a zonefile query to take
PEER_PUSH_ZONEFILES_TIMEOUT = 10
NUM_NEIGHBORS = 80 # number of neighbors a peer can report
ZONEFILE_INV = "" # this atlas peer's current zonefile inventory
NUM_ZONEFILES = 0 # cache-coherent count of the number of zonefiles present
MAX_QUEUED_ZONEFILES = 1000 # maximum number of queued zonefiles
@@ -82,7 +91,10 @@ if os.environ.get("BLOCKSTACK_ATLAS_MAX_NEIGHBORS") is not None:
if os.environ.get("BLOCKSTACK_TEST", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1":
# use test client
from blockstack_integration_tests import AtlasRPCTestClient as BlockstackRPCClient
from blockstack_integration_tests import time_now, time_sleep, atlas_max_neighbors, atlas_peer_lifetime_interval, atlas_peer_ping_interval, atlas_peer_max_age, atlas_peer_clean_interval
from blockstack_integration_tests import time_now, time_sleep, atlas_max_neighbors, \
atlas_peer_lifetime_interval, atlas_peer_ping_interval, atlas_peer_max_age, \
atlas_peer_clean_interval, atlas_ping_timeout, atlas_inv_timeout, atlas_neighbors_timeout, \
atlas_zonefiles_timeout, atlas_push_zonefiles_timeout
else:
# production
@@ -109,6 +121,21 @@ else:
def atlas_peer_clean_interval():
return PEER_CLEAN_INTERVAL
def atlas_ping_timeout():
return PEER_PING_TIMEOUT
def atlas_inv_timeout():
return PEER_INV_TIMEOUT
def atlas_neighbors_timeout():
return PEER_NEIGHBORS_TIMEOUT
def atlas_zonefiles_timeout():
return PEER_ZONEFILES_TIMEOUT
def atlas_push_zonefiles_timeout():
return PEER_PUSH_ZONEFILES_TIMEOUT
ATLASDB_SQL = """
CREATE TABLE zonefiles( inv_index INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -133,6 +160,7 @@ ZONEFILE_QUEUE = [] # list of {zonefile_hash: zonefile} dicts to push out to
PEER_TABLE_LOCK = threading.Lock()
PEER_QUEUE_LOCK = threading.Lock()
PEER_TABLE_LOCK_HOLDER = None
ZONEFILE_QUEUE_LOCK = threading.Lock()
def atlas_peer_table_lock():
@@ -140,8 +168,14 @@ def atlas_peer_table_lock():
Lock the global health info table.
Return the table.
"""
global PEER_TABLE_LOCK, PEER_TABLE
global PEER_TABLE_LOCK, PEER_TABLE, PEER_TABLE_LOCK_HOLDER
if PEER_TABLE_LOCK_HOLDER is not None:
assert PEER_TABLE_LOCK_HOLDER != threading.current_thread(), "DEADLOCK"
log.warning("\n\nPossible contention: lock from %s (but held by %s)\n\n" % (threading.current_thread(), PEER_TABLE_LOCK_HOLDER))
PEER_TABLE_LOCK.acquire()
PEER_TABLE_LOCK_HOLDER = threading.current_thread()
return PEER_TABLE
@@ -149,7 +183,8 @@ def atlas_peer_table_unlock():
"""
Unlock the global health info table.
"""
global PEER_TABLE_LOCK
global PEER_TABLE_LOCK, PEER_TABLE_LOCK_HOLDER
PEER_TABLE_LOCK_HOLDER = None
PEER_TABLE_LOCK.release()
return
@@ -355,7 +390,7 @@ def atlasdb_add_zonefile_info( zonefile_hash, present, block_height, con=None, p
Mark it as present or absent.
Keep our in-RAM inventory vector up-to-date
"""
global ZONEFILE_INV
global ZONEFILE_INV, NUM_ZONEFILES
if path is None:
path = atlasdb_path()
@@ -377,6 +412,9 @@ def atlasdb_add_zonefile_info( zonefile_hash, present, block_height, con=None, p
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=con, path=path )
ZONEFILE_INV = atlas_inventory_set_zonefile_bits( ZONEFILE_INV, zfbits )
# keep in-RAM zonefile count coherent
NUM_ZONEFILES = atlasdb_zonefile_inv_length( con=con, path=path )
if close:
con.close()
@@ -497,6 +535,20 @@ def atlasdb_set_zonefile_present( zonefile_hash, present, con=None, path=None ):
return was_present
def atlasdb_cache_zonefile_info( con=None, path=None ):
"""
Load up and cache our zonefile inventory
"""
global ZONEFILE_INV, NUM_ZONEFILES
inv_len = atlasdb_zonefile_inv_length( con=con, path=path )
inv = atlasdb_make_zonefile_inv( 0, inv_len )
ZONEFILE_INV = inv
NUM_ZONEFILES = inv_len
return inv
def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
"""
What bit(s) in a zonefile inventory does a zonefile hash correspond to?
@@ -996,6 +1048,8 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
log.debug("Loading peer table")
peer_table = atlasdb_load_peer_table( con )
# cache zonefile inventory and count
atlasdb_cache_zonefile_info( con=con )
con.close()
else:
@@ -1184,12 +1238,28 @@ def atlas_make_zonefile_inventory( bit_offset, bit_length, con=None, path=None )
return inv
def atlas_get_zonefile_inventory():
def atlas_get_zonefile_inventory( offset=None, length=None ):
"""
Get the in-RAM zonefile inventory vector.
"""
global ZONEFILE_INV
return ZONEFILE_INV
if offset is None:
offset = 0
if length is None:
length = len(ZONEFILE_INV) - offset
ret = ZONEFILE_INV[offset:offset+length]
return ret
def atlas_get_num_zonefiles():
"""
Get the number of zonefiles we know about
"""
global NUM_ZONEFILES
return NUM_ZONEFILES
def atlas_init_peer_info( peer_table, peer_hostport, blacklisted=False, whitelisted=False ):
@@ -1204,19 +1274,19 @@ def atlas_init_peer_info( peer_table, peer_hostport, blacklisted=False, whitelis
}
def atlas_peer_ping( peer_hostport, timeout=3, peer_table=None ):
def atlas_peer_ping( peer_hostport, timeout=None, peer_table=None ):
"""
Ping a host
Return True if alive
Return False if not
"""
if timeout is None:
timeout = atlas_ping_timeout()
host, port = url_to_host_port( peer_hostport )
rpc = BlockstackRPCClient( host, port, timeout=timeout )
locked = False
if peer_table is None:
locked = True
log.debug("Ping %s" % peer_hostport)
ret = False
try:
@@ -1227,7 +1297,9 @@ def atlas_peer_ping( peer_hostport, timeout=3, peer_table=None ):
pass
# update health
if locked:
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
if peer_table.has_key(peer_hostport):
@@ -1240,6 +1312,56 @@ def atlas_peer_ping( peer_hostport, timeout=3, peer_table=None ):
return ret
def atlas_peer_getinfo( peer_hostport, timeout=None, peer_table=None ):
"""
Get host info
Return True if alive
Return False if not
"""
if timeout is None:
timeout = atlas_ping_timeout()
host, port = url_to_host_port( peer_hostport )
rpc = BlockstackRPCClient( host, port, timeout=timeout )
log.debug("Getinfo %s" % peer_hostport)
res = None
try:
res = rpc.getinfo()
assert type(res) in [dict], 'Did not receive a dict'
for req_field in ['consensus', 'server_version', 'last_block_processed']:
assert req_field in res.keys(), "Missing field '%s'" % req_field
assert type(res[req_field]) in [str, unicode], "Not a string: '%s'" % req_field
res[req_field] = str(res[req_field])
except AssertionError, ae:
log.exception(ae)
log.error("Invalid server reply for getinfo from %s" % peer_hostport)
except Exception, e:
log.exception(e)
log.error("Failed to get response from %s" % peer_hostport)
# update health
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
if peer_table.has_key(peer_hostport):
atlas_peer_update_health( peer_hostport, (res is not None), peer_table=peer_table )
if locked:
atlas_peer_table_unlock()
peer_table = None
return ret
def atlas_inventory_count_missing( inv1, inv2 ):
"""
Find out how many bits are set in inv2
@@ -1295,6 +1417,29 @@ def atlas_get_live_neighbors( remote_peer_hostport, peer_table=None, min_health=
return alive_peers
def atlas_get_all_neighbors( peer_table=None ):
"""
Get *all* neighbor information.
USED ONLY FOR TESTING
"""
if os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION") != "1":
raise Exception("This method is only available when testing with the Atlas network simulator")
ret = {}
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
ret.update(peer_table)
if locked:
atlas_peer_table_unlock()
peer_table = None
return ret
def atlas_remove_peers( dead_peers, peer_table ):
"""
Remove all peer information for the given dead peers from the given health info,
@@ -1473,7 +1618,7 @@ def atlas_peer_update_health( peer_hostport, received_response, peer_table=None
locked = False
if peer_table is None:
locked = True
locked = True
peer_table = atlas_peer_table_lock()
if peer_hostport not in peer_table.keys():
@@ -1512,7 +1657,7 @@ def atlas_peer_update_health( peer_hostport, received_response, peer_table=None
return True
def atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, bit_offset, bit_count, timeout=10, peer_table=None ):
def atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, bit_offset, bit_count, timeout=None, peer_table=None ):
"""
Get the zonefile inventory bit vector for a given peer.
The returned range will be [bit_offset, bit_offset+count]
@@ -1525,6 +1670,9 @@ def atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, bit_off
Return None if we couldn't contact the peer.
"""
if timeout is None:
timeout = atlas_inv_timeout()
host, port = url_to_host_port( peer_hostport )
rpc = BlockstackRPCClient( host, port, timeout=timeout, src=my_hostport )
@@ -1548,7 +1696,7 @@ def atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, bit_off
raise AssertionError("Inv is not base64")
# make sure it corresponds to this range
assert len(zf_inv['inv']) <= bit_count, "Zonefile in is too long"
assert len(zf_inv['inv']) <= (bit_count / 8) + (bit_count % 8), "Zonefile inventory in is too long"
# success!
atlas_peer_update_health( peer_hostport, True, peer_table=peer_table )
@@ -1574,7 +1722,43 @@ def atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, bit_off
return zf_inv['inv']
def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, timeout=10, peer_table=None ):
def atlas_peer_download_zonefile_inventory( my_hostport, peer_hostport, maxlen, bit_offset=0, timeout=None, peer_table={} ):
"""
Get the zonefile inventory from the remote peer
Start from the given bit_offset
NOTE: this doesn't update the peer table health by default;
you'll have to explicitly pass in a peer table (i.e. setting
to {} ensures that nothing happens).
"""
if timeout is None:
timeout = atlas_inv_timeout()
interval = 524288 # number of bits in 64KB
peer_inv = ""
if bit_offset >= maxlen:
# synced already
return peer_inv
for offset in xrange( bit_offset, maxlen, interval):
next_inv = atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, offset, interval, timeout=timeout, peer_table=peer_table )
if next_inv is None:
# partial failure
log.debug("Failed to sync inventory for %s from %s to %s" % (peer_hostport, offset, offset+interval))
break
peer_inv += next_inv
if len(next_inv) < interval:
# end-of-interval
break
return peer_inv
def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, timeout=None, peer_table=None ):
"""
Synchronize our knowledge of a peer's zonefiles up to a given byte length
NOT THREAD SAFE; CALL FROM ONLY ONE THREAD.
@@ -1584,8 +1768,10 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
Return the new inv vector if we synced it (updating the peer table in the process)
Return None if not
"""
if timeout is None:
timeout = atlas_inv_timeout()
peer_inv = ""
interval = 80000 # 10kb
locked = False
if peer_table is None:
@@ -1605,25 +1791,7 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
atlas_peer_table_unlock()
peer_table = None
if bit_offset >= maxlen:
# synced already
if locked:
atlas_peer_table_unlock()
peer_table = None
return peer_inv
for offset in xrange( bit_offset, maxlen, interval):
next_inv = atlas_peer_get_zonefile_inventory_range( my_hostport, peer_hostport, offset, interval, timeout=timeout, peer_table=peer_table )
if next_inv is None:
# partial failure
log.debug("Failed to sync inventory for %s from %s to %s" % (peer_hostport, offset, offset+interval))
break
peer_inv += next_inv
if len(next_inv) < interval:
# end-of-interval
break
peer_inv = atlas_peer_download_zonefile_inventory( my_hostport, peer_hostport, maxlen, bit_offset=bit_offset, timeout=timeout, peer_table=peer_table )
if locked:
peer_table = atlas_peer_table_lock()
@@ -1637,7 +1805,7 @@ def atlas_peer_sync_zonefile_inventory( my_hostport, peer_hostport, maxlen, time
return peer_inv
def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offset, timeout=10, peer_table=None, con=None, path=None, local_inv=None ):
def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offset, timeout=None, peer_table=None, con=None, path=None, local_inv=None ):
"""
Refresh a peer's zonefile recent inventory vector entries,
by removing every bit after byte_offset and re-synchronizing them.
@@ -1653,6 +1821,9 @@ def atlas_peer_refresh_zonefile_inventory( my_hostport, peer_hostport, byte_offs
Return False if not.
"""
if timeout is None:
timeout = atlas_inv_timeout()
if local_inv is None:
# get local zonefile inv
inv_len = atlasdb_zonefile_inv_length( con=con, path=path )
@@ -1798,7 +1969,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
if len(missing) == 0:
# none!
return []
return ret
locked = False
if peer_table is None:
@@ -1878,7 +2049,7 @@ def atlas_peer_has_zonefile( peer_hostport, zonefile_hash, zonefile_bits=None, c
return res
def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=10, peer_table=None, con=None, path=None ):
def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=None, peer_table=None, con=None, path=None ):
"""
Ask the peer server at the given URL for its neighbors.
@@ -1890,6 +2061,9 @@ def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=10, peer_table
Raise on invalid URL
"""
if timeout is None:
timeout = atlas_neighbors_timeout()
host, port = url_to_host_port( peer_hostport )
if host is None or port is None:
log.debug("Invalid host/port %s" % peer_hostport)
@@ -1941,7 +2115,7 @@ def atlas_peer_get_neighbors( my_hostport, peer_hostport, timeout=10, peer_table
return ret
def atlas_get_zonefiles( my_hostport, peer_hostport, zonefile_hashes, timeout=60, peer_table=None ):
def atlas_get_zonefiles( my_hostport, peer_hostport, zonefile_hashes, timeout=None, peer_table=None ):
"""
Given a list of zonefile hashes.
go and get them from the given host.
@@ -1952,6 +2126,9 @@ def atlas_get_zonefiles( my_hostport, peer_hostport, zonefile_hashes, timeout=60
Return None on error.
"""
if timeout is None:
timeout = atlas_zonefiles_timeout()
host, port = url_to_host_port( peer_hostport )
rpc = BlockstackRPCClient( host, port, timeout=timeout, src=my_hostport )
@@ -2071,7 +2248,7 @@ def atlas_rank_peers_by_data_availability( peer_list=None, peer_table=None, loca
if len(peer_inv) == 0:
continue
availability_score = atlas_peer_get_availability( local_inv, peer_inv )
availability_score = atlas_inventory_count_missing( local_inv, peer_inv )
peer_availability_ranking.append( (availability_score, peer_hostport) )
if locked:
@@ -2085,58 +2262,6 @@ def atlas_rank_peers_by_data_availability( peer_list=None, peer_table=None, loca
return [peer_hp for _, peer_hp in peer_availability_ranking]
def atlas_peer_get_availability( local_inv, peer_inv ):
"""
Calculate the number of zonefiles the given peer has that we don't have,
given the local and peer availability inventory vectors
"""
count = 0
minlen = min( len(local_inv), len(peer_inv) )
for i in xrange(0, minlen):
local = ord(local_inv[i])
remote = ord(peer_inv[i])
for j in xrange(0, 8):
if ((1 << j) & remote) != 0 and ((1 << j) & local) == 0:
# this peer has this zonefile, but we don't
count += 1
if len(peer_inv) > len(local_inv):
# this peer has more zonefiles
for i in xrange(len(local_inv), len(peer_inv)):
remote = ord(peer_inv[i])
for j in xrange(0, 8):
if ((1 << j) & remote) != 0:
# this peer has this zonefile, but we don't
count += 1
return count
def atlas_peer_get_last_response_time( peer_hostport, peer_table=None ):
"""
Get the last time we got a positive response
from a peer.
Return the time on success
Return -1.0 if we never heard from them
"""
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
last_time = -1.0
if peer_hostport in peer_table.keys():
for (t, r) in peer_table[peer_hostport]['time']:
if r and t > last_time:
last_time = peer_table[peer_hostport]['time']
if locked:
atlas_peer_table_unlock()
peer_table = None
return last_time
def atlas_peer_enqueue( peer_hostport, peer_table=None, peer_queue=None, max_neighbors=None ):
"""
Begin talking to a new peer, if we aren't already.
@@ -2288,6 +2413,60 @@ def atlas_zonefile_push_dequeue( zonefile_queue=None ):
return ret
def atlas_zonefile_push( my_hostport, peer_hostport, zonefile_dict, timeout=None, peer_table=None ):
"""
Push the given zonefile to the given peer
Return True on success
Return False on failure
"""
if timeout is None:
timeout = atlas_push_zonefiles_timeout()
zonefile_data = serialize_zonefile( zonefile_dict )
zonefile_hash = blockstack_client.hash_zonefile( zonefile_dict )
host, port = url_to_host_port( peer_hostport )
rpc = BlockstackRPCClient( host, port, timeout=timeout, src=my_hostport )
status = False
try:
push_info = rpc.put_zonefiles( [zonefile_data] )
assert type(push_info) in [dict], "Invalid push response"
if 'status' in push_info and push_info['status']:
assert 'saved' in push_info, "Missing saved vector"
assert type(push_info) == list, "Invalid saved vector"
assert len(push_info['saved']) == 1, "Invalid saved vector"
assert type(push_info['saved']) in [int, long], "Invalid saved vector"
assert push_info['saved'][0] in [0, 1], "Invalid saved vector value"
if push_info['saved'] == 1:
# woo!
saved = True
except AssertionError, ae:
log.exception(ae)
log.error("Invalid server response from %s" % peer_hostport )
except Exception, e:
log.exception(e)
log.error("Failed to push zonefile %s to %s" % (zonefile_hash, peer_hostport))
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
atlas_peer_update_health( peer_hostport, status, peer_table=peer_table )
if locked:
atlas_peer_table_unlock()
peer_table = None
return status
class AtlasPeerCrawler( threading.Thread ):
"""
Thread that continuously crawls peers.
@@ -2340,16 +2519,23 @@ class AtlasPeerCrawler( threading.Thread ):
self.max_neighbors = None
self.atlasdb_path = path
self.neighbors_timeout = None
self.ping_timeout = None
def get_neighbors( self, peer_hostport, con=None, path=None, peer_table=None ):
"""
Get neighbors of this peer
"""
if self.neighbors_timeout is None:
self.neighbors_timeout = atlas_neighbors_timeout()
neighbors = None
if peer_hostport == self.my_hostport:
neighbors = atlas_get_live_neighbors( None, peer_table=peer_table )
else:
neighbors = atlas_peer_get_neighbors( self.my_hostport, peer_hostport, timeout=10, peer_table=peer_table, path=path, con=con )
neighbors = atlas_peer_get_neighbors( self.my_hostport, peer_hostport, timeout=self.neighbors_timeout, peer_table=peer_table, path=path, con=con )
if neighbors is not None:
log.debug("%s: neighbors of %s are (%s): %s" % (self.my_hostport, peer_hostport, len(neighbors), ",".join(neighbors)))
@@ -2367,6 +2553,9 @@ class AtlasPeerCrawler( threading.Thread ):
Return the list of peers added
"""
if self.ping_timeout is None:
self.ping_timeout = atlas_ping_timeout()
# only handle a few peers for now
cnt = 0
i = 0
@@ -2386,7 +2575,15 @@ class AtlasPeerCrawler( threading.Thread ):
# test the peer before adding
res = False
if peer != self.my_hostport:
res = atlas_peer_ping(peer, timeout=2)
res = atlas_peer_getinfo( peer, timeout=self.ping_timeout )
if res is None:
# didn't respond
continue
if semver_newer( MIN_ATLAS_VERSION, res['version'] ):
# too old to be an atlas node
log.debug("%s is too old to be an atlas node (version %s)" % (peer, res['version']))
continue
if res:
atlasdb_add_peer( peer, con=con, path=path, peer_table=peer_table )
@@ -2404,10 +2601,11 @@ class AtlasPeerCrawler( threading.Thread ):
locked = False
if peer_table is None:
locked = True
peer_table = atlas_peer_table_lock()
removed = []
rank_peer_list = atlas_rank_peers_by_health( peer_table, with_rank=True )
rank_peer_list = atlas_rank_peers_by_health( peer_table=peer_table, with_rank=True )
for rank, peer in rank_peer_list:
reqcount = atlas_peer_get_request_count( peer, peer_hostport )
if reqcount >= min_request_count and rank < min_health:
@@ -2422,6 +2620,7 @@ class AtlasPeerCrawler( threading.Thread ):
if locked:
atlas_peer_table_unlock()
peer_table = None
return removed
@@ -2512,8 +2711,6 @@ class AtlasPeerCrawler( threading.Thread ):
locked = False
if peer_table is None:
locked = True
if locked:
peer_table = atlas_peer_table_lock()
current_peers = peer_table.keys()[:]
@@ -2531,6 +2728,8 @@ class AtlasPeerCrawler( threading.Thread ):
Ping them first (to see if they're alive), and drop hosts from the pending
queue if it gets too long.
Update our new peer queue, and update the peer table.
Return the number of peers processed
"""
# add newly-discovered peers, but only after we ping them
@@ -2556,7 +2755,7 @@ class AtlasPeerCrawler( threading.Thread ):
new_peers = new_peers[:max_new_peers]
self.new_peers = new_peers
return True
return len(added)
def update_existing_peers( self, num_to_remove, peer_table=None, con=None, path=None ):
@@ -2564,6 +2763,8 @@ class AtlasPeerCrawler( threading.Thread ):
Update the set of existing peers:
* revalidate the existing but old peers
* remove at most $num_to_remove unhealthy peers
Return the number of peers removed
"""
# remove peers that are too old
@@ -2589,7 +2790,7 @@ class AtlasPeerCrawler( threading.Thread ):
if peer in self.new_peers:
self.new_peers.remove(peer)
return True
return len(removed)
def random_walk_reset( self ):
@@ -2612,13 +2813,16 @@ class AtlasPeerCrawler( threading.Thread ):
* Remove at most 10 old, unresponsive peers from the peer DB.
"""
log.debug("%s: %s step" % (self.my_hostport, self.__class__.__name__))
if self.max_neighbors is None:
self.max_neighbors = atlas_max_neighbors()
log.debug("%s: max neighbors is %s" % (self.my_hostport, self.max_neighbors))
current_peers = self.get_current_peers( peer_table=peer_table )
# add some new peers
self.update_new_peers( 10, current_peers, peer_queue=peer_queue, peer_table=peer_table, con=con, path=path )
num_added = self.update_new_peers( 10, current_peers, peer_queue=peer_queue, peer_table=peer_table, con=con, path=path )
# use MHRWDA to walk the peer graph.
# first, begin the walk if we haven't already
@@ -2668,13 +2872,18 @@ class AtlasPeerCrawler( threading.Thread ):
# update the existing peer info
self.update_existing_peers( 10, con=con, path=path, peer_table=peer_table )
num_removed = self.update_existing_peers( 10, con=con, path=path, peer_table=peer_table )
return num_added, num_removed
def run(self):
self.running = True
while self.running:
self.step( path=self.atlasdb_path )
num_added, num_removed = self.step( path=self.atlasdb_path )
if num_added == 0 and num_removed == 0:
# take a break
# time_sleep(self.hostport, self.__class__.__name__, 1.0)
pass
def ask_join(self):
@@ -2707,13 +2916,15 @@ class AtlasHealthChecker( threading.Thread ):
Return True on success
Return False on error
"""
log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if path is None:
path = self.path
lock = False
if peer_table is None:
lock = True
peer_table = peer_table_lock()
peer_table = atlas_peer_table_lock()
peer_hostports = []
stale_peers = []
@@ -2729,7 +2940,7 @@ class AtlasHealthChecker( threading.Thread ):
stale_peers.append(peer)
if lock:
peer_table_unlock()
atlas_peer_table_unlock()
peer_table = None
for peer_hostport in stale_peers:
@@ -2746,8 +2957,9 @@ class AtlasHealthChecker( threading.Thread ):
"""
Loop forever, pinging someone every pass.
"""
self.running = True
while self.running:
local_inv = atlas_get_local_inventory()
local_inv = atlas_get_zonefile_inventory()
self.step( peer_table=peer_table, local_inv=local_inv, path=self.atlasdb_path )
@@ -2786,6 +2998,8 @@ class AtlasZonefileCrawler( threading.Thread ):
Return the number of zonefiles fetched
"""
log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if path is None:
path = self.path
@@ -2793,6 +3007,7 @@ class AtlasZonefileCrawler( threading.Thread ):
if con is None:
close = True
con = atlasdb_open( path )
assert con is not None
num_fetched = 0
locked = False
@@ -2801,6 +3016,7 @@ class AtlasZonefileCrawler( threading.Thread ):
locked = True
peer_table = atlas_peer_table_lock()
log.debug("%s: find missing zonefile info" % self.hostport)
missing_zfinfo = atlas_find_missing_zonefile_availability( peer_table=peer_table, con=con, path=path )
peer_hostports = peer_table.keys()[:]
@@ -2919,7 +3135,8 @@ class AtlasZonefileCrawler( threading.Thread ):
con.close()
if num_fetched == 0:
time_sleep(self.hostport, self.__class__.__name__, 1.0)
# time_sleep(self.hostport, self.__class__.__name__, 1.0)
pass
def ask_join(self):
@@ -2927,13 +3144,14 @@ class AtlasZonefileCrawler( threading.Thread ):
class AtlasZonefilePusher(object):
class AtlasZonefilePusher(threading.Thread):
"""
Continuously drain the queue of zonefiles
we can push, by sending them off to
known peers who need them.
"""
def __init__(self, host, port, path=None ):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.hostport = "%s:%s" % (host, port)
@@ -2941,6 +3159,8 @@ class AtlasZonefilePusher(object):
if self.path is None:
self.path = atlasdb_path()
self.push_timeout = None
def step( self, peer_table=None, zonefile_queue=None, path=None ):
"""
@@ -2948,6 +3168,12 @@ class AtlasZonefilePusher(object):
Push the zonefile to all the peers that need it.
Return the number of peers we sent to
"""
log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if self.push_timeout is None:
self.push_timeout = atlas_push_zonefiles_timeout()
zfinfo = atlas_zonefile_push_dequeue( zonefile_queue=zonefile_queue )
if zfinfo is None:
return 0
@@ -2981,7 +3207,7 @@ class AtlasZonefilePusher(object):
ret = 0
for peer in peers:
log.debug("%s: Push to %s" % (self.hostport, peer))
atlas_zonefile_push( peer, zfhash, zfdata, timeout=10 )
atlas_zonefile_push( peer, zfdata, timeout=self.push_timeout )
ret += 1
return ret
@@ -2992,7 +3218,8 @@ class AtlasZonefilePusher(object):
while self.running:
num_pushed = self.step( path=self.path )
if num_pushed == 0:
time_sleep(self.hostport, self.__class__.__name__, 1.0)
# time_sleep(self.hostport, self.__class__.__name__, 1.0)
pass
def ask_join(self):