mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-21 18:32:04 +08:00
some refactoring of the zonefile crawler; only open the atlas db when
needed while crawling zonefiles (since we can crawl a lot of them at once)
This commit is contained in:
@@ -2198,7 +2198,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
|
||||
missing += zfinfo
|
||||
bit_offset += len(zfinfo)
|
||||
|
||||
log.debug("Missing %s zonefiles" % len(zfinfo))
|
||||
log.debug("Missing %s zonefiles" % len(missing))
|
||||
|
||||
else:
|
||||
missing = missing_zonefile_info
|
||||
@@ -2243,7 +2243,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
|
||||
ret[zfinfo['zonefile_hash']]['indexes'].append( zfinfo['inv_index']-1 )
|
||||
ret[zfinfo['zonefile_hash']]['popularity'] += popularity
|
||||
ret[zfinfo['zonefile_hash']]['peers'] += peers
|
||||
ret[zfinfo['zonefile_hash']]['tried_storage'] = (zfinfo['tried_storage'] != 0)
|
||||
ret[zfinfo['zonefile_hash']]['tried_storage'] = zfinfo['tried_storage']
|
||||
|
||||
if locked:
|
||||
atlas_peer_table_unlock()
|
||||
@@ -3320,6 +3320,101 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
return rc
|
||||
|
||||
|
||||
def store_zonefiles( self, zonefiles, peer_zonefile_hashes, path, con=None ):
|
||||
"""
|
||||
Store a list of RPC-fetched zonefiles (but only ones in peer_zonefile_hashes)
|
||||
Return the list of zonefile hashes stored.
|
||||
"""
|
||||
ret = []
|
||||
close = False
|
||||
|
||||
if con is None:
|
||||
close = True
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
for fetched_zfhash, zonefile_txt in zonefiles.items():
|
||||
|
||||
zonefile = None
|
||||
try:
|
||||
zonefile = blockstack_zones.parse_zone_file( zonefile_txt )
|
||||
except:
|
||||
log.error("Unparseable zonefile: %s" % fetched_zfhash)
|
||||
continue
|
||||
|
||||
if fetched_zfhash not in peer_zonefile_hashes:
|
||||
# unsolicited
|
||||
log.warn("%s: Unsolicited zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
continue
|
||||
|
||||
rc = self.store_zonefile_data( fetched_zfhash, zonefile_txt, peer_hostport, con, path )
|
||||
if rc:
|
||||
# don't ask for it again
|
||||
ret.append( fetched_zfhash )
|
||||
|
||||
|
||||
if close:
|
||||
con.close()
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def try_crawl_storage( self, zfhash, path, con=None ):
|
||||
"""
|
||||
Try to get a zonefile from storage
|
||||
Record in the DB that we tried.
|
||||
Return True on success
|
||||
Return False if not
|
||||
"""
|
||||
rc = None
|
||||
close = False
|
||||
if con is None:
|
||||
close = True
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
# is this zonefile available via storage?
|
||||
log.debug("Try loading %s from storage" % zfhash)
|
||||
|
||||
zonefile_info = atlas_get_zonefile_data_from_storage( zfhash, self.zonefile_storage_drivers )
|
||||
|
||||
# tried loading from storage
|
||||
atlasdb_set_zonefile_tried_storage( zfhash, True, con=con, path=path )
|
||||
|
||||
if 'error' in zonefile_info:
|
||||
log.error("%s: Failed to get zonefile '%s' from storage" % (self.hostport, zfhash))
|
||||
rc = False
|
||||
|
||||
else:
|
||||
# got it! remember it
|
||||
log.debug("%s: got %s from storage" % (self.hostport, zfhash))
|
||||
rc = self.store_zonefile_data( zfhash, zonefile_info['zonefile_data'], "storage", con, path )
|
||||
|
||||
if close:
|
||||
con.close()
|
||||
|
||||
return rc
|
||||
|
||||
|
||||
def find_zonefile_origins( self, missing_zfinfo, peer_hostports ):
|
||||
"""
|
||||
Find out which peers can serve which zonefiles
|
||||
"""
|
||||
zonefile_origins = {} # map peer hostport to list of zonefile hashes
|
||||
|
||||
# which peers can serve each zonefile?
|
||||
for zfhash in missing_zfinfo.keys():
|
||||
for peer_hostport in peer_hostports:
|
||||
if not zonefile_origins.has_key(peer_hostport):
|
||||
zonefile_origins[peer_hostport] = []
|
||||
|
||||
if peer_hostport in missing_zfinfo[zfhash]['peers']:
|
||||
zonefile_origins[peer_hostport].append( zfhash )
|
||||
|
||||
return zonefile_origins
|
||||
|
||||
|
||||
|
||||
def step(self, con=None, path=None, peer_table=None):
|
||||
"""
|
||||
Run one step of this algorithm:
|
||||
@@ -3340,12 +3435,6 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
if path is None:
|
||||
path = self.path
|
||||
|
||||
close = False
|
||||
if con is None:
|
||||
close = True
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
num_fetched = 0
|
||||
locked = False
|
||||
|
||||
@@ -3353,7 +3442,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
locked = True
|
||||
peer_table = atlas_peer_table_lock()
|
||||
|
||||
missing_zfinfo = atlas_find_missing_zonefile_availability( peer_table=peer_table, con=con, path=path )
|
||||
missing_zfinfo = atlas_find_missing_zonefile_availability( peer_table=peer_table, path=path )
|
||||
peer_hostports = peer_table.keys()[:]
|
||||
|
||||
if locked:
|
||||
@@ -3364,17 +3453,9 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
zonefile_ranking = [ (missing_zfinfo[zfhash]['popularity'], zfhash) for zfhash in missing_zfinfo.keys() ]
|
||||
zonefile_ranking.sort()
|
||||
zonefile_hashes = list(set([zfhash for (_, zfhash) in zonefile_ranking]))
|
||||
zonefile_origins = {} # map peer hostport to list of zonefile hashes
|
||||
|
||||
# which peers can serve each zonefile?
|
||||
for zfhash in missing_zfinfo.keys():
|
||||
for peer_hostport in peer_hostports:
|
||||
if not zonefile_origins.has_key(peer_hostport):
|
||||
zonefile_origins[peer_hostport] = []
|
||||
|
||||
if peer_hostport in missing_zfinfo[zfhash]['peers']:
|
||||
zonefile_origins[peer_hostport].append( zfhash )
|
||||
|
||||
zonefile_origins = self.find_zonefile_origins( missing_zfinfo, peer_hostports )
|
||||
|
||||
log.debug("%s: missing %s zonefiles" % (self.hostport, len(zonefile_hashes)))
|
||||
|
||||
while len(zonefile_hashes) > 0:
|
||||
@@ -3384,27 +3465,15 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
|
||||
# is this zonefile available via storage?
|
||||
if not missing_zfinfo[zfhash]['tried_storage']:
|
||||
log.debug("Try loading %s from storage" % zfhash)
|
||||
|
||||
zonefile_info = atlas_get_zonefile_data_from_storage( zfhash, self.zonefile_storage_drivers )
|
||||
|
||||
# tried loading from storage
|
||||
atlasdb_set_zonefile_tried_storage( zfhash, True, con=con, path=path )
|
||||
|
||||
if 'error' in zonefile_info:
|
||||
log.error("%s: Failed to get zonefile '%s' from storage" % (self.hostport, zfhash))
|
||||
|
||||
else:
|
||||
# got it! remember it
|
||||
log.debug("%s: got %s from storage" % (self.hostport, zfhash))
|
||||
rc = self.store_zonefile_data( zfhash, zonefile_info['zonefile_data'], "storage", con, path )
|
||||
if rc:
|
||||
# don't ask for it again
|
||||
zonefile_hashes.pop(0)
|
||||
num_fetched += 1
|
||||
continue
|
||||
rc = self.try_crawl_storage( zfhash, path )
|
||||
if rc:
|
||||
# don't ask for it again
|
||||
zonefile_hashes.pop(0)
|
||||
num_fetched += 1
|
||||
continue
|
||||
|
||||
if len(peers) == 0:
|
||||
# unavailable
|
||||
if not missing_zfinfo[zfhash]['tried_storage']:
|
||||
log.debug("%s: zonefile %s is unavailable" % (self.hostport, zfhash))
|
||||
|
||||
@@ -3435,25 +3504,13 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
if zonefiles is not None:
|
||||
|
||||
# got zonefiles!
|
||||
for fetched_zfhash, zonefile_txt in zonefiles.items():
|
||||
|
||||
zonefile = None
|
||||
try:
|
||||
zonefile = blockstack_zones.parse_zone_file( zonefile_txt )
|
||||
except:
|
||||
log.error("Unparseable zonefile: %s" % fetched_zfhash)
|
||||
continue
|
||||
|
||||
if fetched_zfhash not in peer_zonefile_hashes:
|
||||
# unsolicited
|
||||
log.warn("%s: Unsolicited zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
continue
|
||||
|
||||
rc = self.store_zonefile_data( fetched_zfhash, zonefile_txt, peer_hostport, con, path )
|
||||
if rc:
|
||||
# don't ask for it again
|
||||
peer_zonefile_hashes.remove(fetched_zfhash)
|
||||
num_fetched += 1
|
||||
stored_zfhashes = self.store_zonefiles( zonefiles, peer_zonefile_hashes, path )
|
||||
|
||||
# don't ask again
|
||||
log.debug("Stored %s zonefiles" % len(stored_zfhashes))
|
||||
for zfh in stored_zfhashes:
|
||||
peer_zonefile_hashes.remove(zfh)
|
||||
num_fetched += 1
|
||||
|
||||
else:
|
||||
log.debug("%s: no data received from %s" % (self.hostport, peer_hostport))
|
||||
@@ -3474,9 +3531,6 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
# done with this zonefile
|
||||
zonefile_hashes.pop(0)
|
||||
|
||||
if close:
|
||||
con.close()
|
||||
|
||||
log.debug("%s: fetched %s zonefiles" % (self.hostport, num_fetched))
|
||||
return num_fetched
|
||||
|
||||
@@ -3484,14 +3538,11 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
def run(self):
|
||||
self.running = True
|
||||
while self.running:
|
||||
con = atlasdb_open( self.path )
|
||||
|
||||
t1 = time.time()
|
||||
num_fetched = self.step( con=con, path=self.path )
|
||||
num_fetched = self.step( path=self.path )
|
||||
t2 = time.time()
|
||||
|
||||
con.close()
|
||||
|
||||
if num_fetched == 0 and t2 - t1 < PEER_CRAWL_ZONEFILE_WORK_INTERVAL:
|
||||
time_sleep( self.hostport, self.__class__.__name__, PEER_CRAWL_ZONEFILE_WORK_INTERVAL - (t2 - t1) )
|
||||
|
||||
|
||||
Reference in New Issue
Block a user