fix a few bugs in MHRWDA, found during testing

This commit is contained in:
Jude Nelson
2016-08-23 23:13:05 -04:00
parent 8039310238
commit 848cfff660

View File

@@ -187,6 +187,14 @@ def atlas_zonefile_queue_unlock():
ZONEFILE_QUEUE_LOCK.release()
def atlas_max_new_peers( max_neighbors ):
"""
Maximum size of the new peers list
"""
max_new_peers = min(max_neighbors * 10, PEER_MAX_DB)
return max_new_peers
def atlas_inventory_to_string( inv ):
"""
Inventory to string (bitwise big-endian)
@@ -1998,7 +2006,7 @@ def atlas_peer_enqueue( peer_hostport, peer_table=None, peer_queue=None, max_nei
if max_neighbors is None:
max_neighbors = atlas_max_neighbors()
if len(peer_queue) < max_neighbors:
if len(peer_queue) < atlas_max_new_peers(max_neighbors):
peer_queue.append( peer_hostport )
if peer_lock:
@@ -2099,6 +2107,7 @@ def atlas_zonefile_push_enqueue( zonefile_hash, zonefile_data, peer_table=None,
def atlas_zonefile_push_dequeue( zonefile_queue=None ):
"""
Dequeue a zonefile to replicate
Return None if there are none queued
"""
zonefile_queue_locked = False
if zonefile_queue is None:
@@ -2161,6 +2170,7 @@ class AtlasPeerCrawler( threading.Thread ):
self.current_peer_neighbors = []
self.prev_peer = None
self.prev_peer_degree = 0
self.new_peers = []
self.max_neighbors = None
@@ -2176,6 +2186,10 @@ class AtlasPeerCrawler( threading.Thread ):
else:
neighbors = atlas_peer_get_neighbors( self.my_hostport, peer_hostport, timeout=10, peer_table=peer_table, path=path, con=con )
if neighbors is not None:
log.debug("%s: neighbors of %s are (%s): %s" % (self.my_hostport, peer_hostport, len(neighbors), ",".join(neighbors)))
else:
log.error("%s: failed to ask %s for neighbors" % (self.my_hostport, peer_hostport))
return neighbors
@@ -2246,7 +2260,7 @@ class AtlasPeerCrawler( threading.Thread ):
return removed
def walk_graph( self, prev_peer, current_peer, current_peer_neighbors, con=None, path=None, peer_table=None ):
def random_walk_graph( self, prev_peer, prev_peer_degree, current_peer, current_peer_neighbors, con=None, path=None, peer_table=None ):
"""
Take one step from current_peer to a neighbor in current_peer_neighbors,
based on Metropolis-Hastings Random Walk with Delayed Acceptance (MHRWDA).
@@ -2265,35 +2279,41 @@ class AtlasPeerCrawler( threading.Thread ):
error_ret = (None, None)
current_peer_degree = len(current_peer_neighbors)
if current_peer_degree == 0:
# nowhere to go
log.debug("%s: current peer degree is 0" % (self.my_hostport))
return error_ret
next_peer = current_peer_neighbors[ random.randint(0, len(current_peer_neighbors)-1) ]
next_peer_neighbors = self.get_neighbors( next_peer, con=con, path=path, peer_table=peer_table )
if next_peer_neighbors is None or len(next_peer_neighbors) == 0:
# walk failed, or nowhere to go
# restart the walk
log.debug("%s: failed to get neighbors of %s" % (self.my_hostport, next_peer))
return error_ret
next_peer_degree = len(next_peer_neighbors)
p = random.random()
if p <= min(1.0, float(current_peer_degree) / float(next_peer_degree)):
if prev_peer == current_peer and current_peer_degree > 1:
if prev_peer == next_peer and current_peer_degree > 1:
# find a different peer
search = current_peer_neighbors[:]
if current_peer in search:
search.remove(current_peer)
if next_peer in search:
search.remove(next_peer)
alt_peer = search[ random.randint(0, len(search)-1) ]
alt_peer_neighbors = self.get_neighbors( alt_peer, con=con, path=path, peer_table=peer_table )
if alt_peer_neighbors is None or len(alt_peer_neighbors) == 0:
# walk failed, or nowhere to go
# restart the walk
log.debug("%s: failed to get neighbors of %s" % (self.my_hostport, alt_peer))
return error_ret
alt_peer_degree = len(alt_peer_neighbors)
q = random.random()
if q <= min( 1.0, min( 1.0, (float(current_peer_degree) / float(alt_peer_degree))**2 ), max( 1.0, (float(alt_peer_degree) / float(current_peer_degree))**2 ) ):
if q <= min( 1.0, min( 1.0, (float(current_peer_degree) / float(alt_peer_degree))**2 ), max( 1.0, (float(prev_peer_degree) / float(current_peer_degree))**2 ) ):
# go to the alt peer instead
ret_current_peer = alt_peer
ret_current_peer_neighbors = alt_peer_neighbors
@@ -2312,7 +2332,8 @@ class AtlasPeerCrawler( threading.Thread ):
ret_current_peer = current_peer
ret_current_peer_neighbors = self.get_neighbors( current_peer, con=con, path=path, peer_table=peer_table )
if ret_current_peer_neighbors is None or len(ret_current_peer_neighbors) == 0:
# nowhere to go
# nowhere to go
log.debug("%s: failed to refresh %s" % (self.my_hostport, current_peer))
return error_ret
return (ret_current_peer, ret_current_peer_neighbors)
@@ -2365,8 +2386,9 @@ class AtlasPeerCrawler( threading.Thread ):
new_peers.remove(peer)
# DDoS prevention: don't let this get too big
if len(new_peers) > self.max_neighbors * 2:
new_peers = new_peers[:(self.max_neighbors * 2)]
max_new_peers = atlas_max_new_peers( self.max_neighbors )
if len(new_peers) > max_new_peers:
new_peers = new_peers[:max_new_peers]
self.new_peers = new_peers
return True
@@ -2464,12 +2486,13 @@ class AtlasPeerCrawler( threading.Thread ):
# can we walk now?
if self.current_peer is not None:
next_peer, next_peer_neighbors = self.walk_graph( self.prev_peer, self.current_peer, self.current_peer_neighbors, con=con, path=path, peer_table=peer_table )
next_peer, next_peer_neighbors = self.random_walk_graph( self.prev_peer, self.prev_peer_degree, self.current_peer, self.current_peer_neighbors, con=con, path=path, peer_table=peer_table )
if next_peer is not None and next_peer_neighbors is not None:
# success!
self.prev_peer = self.current_peer
self.prev_peer_degree = len(self.current_peer_neighbors)
self.current_peer = next_peer
self.current_peer_neighbors = self.current_peer_neighbors
self.current_peer_neighbors = next_peer_neighbors
# crawl new peers
self.new_peers = list(set(self.new_peers + self.current_peer_neighbors))
@@ -2752,15 +2775,17 @@ class AtlasZonefilePusher(object):
Return the number of peers we sent to
"""
zfinfo = atlas_zonefile_push_dequeue( zonefile_queue=zonefile_queue )
if zfinfo is None:
return 0
zfhash = zfinfo.keys()[0]
zfdata = zfinfo[zfhash]
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=con, path=path )
if len(zfbits) == 0:
# nope
return 0
zfhash = zfinfo.keys()[0]
zfdata = zfinfo[zfhash]
# see if we can send this somewhere
table_locked = False
if peer_table is None: