mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-06-02 14:18:32 +08:00
remove zonefile storage drivers
This commit is contained in:
@@ -47,7 +47,8 @@ from .client import \
|
||||
get_zonefile_inventory as blockstack_get_zonefile_inventory, \
|
||||
get_atlas_peers as blockstack_get_atlas_peers, \
|
||||
get_zonefiles as blockstack_get_zonefiles, \
|
||||
put_zonefiles as blockstack_put_zonefiles
|
||||
put_zonefiles as blockstack_put_zonefiles, \
|
||||
json_is_error
|
||||
|
||||
|
||||
log = virtualchain.get_logger("blockstack-server")
|
||||
@@ -1532,8 +1533,8 @@ def atlas_peer_ping( peer_hostport, timeout=None, peer_table=None ):
|
||||
def atlas_peer_getinfo( peer_hostport, timeout=None, peer_table=None ):
|
||||
"""
|
||||
Get host info
|
||||
Return True if alive
|
||||
Return False if not
|
||||
Return the rpc_getinfo() response on success
|
||||
Return None on error
|
||||
"""
|
||||
|
||||
if timeout is None:
|
||||
@@ -1564,6 +1565,15 @@ def atlas_peer_getinfo( peer_hostport, timeout=None, peer_table=None ):
|
||||
except Exception, e:
|
||||
log.exception(e)
|
||||
log.error("Failed to get response from %s" % peer_hostport)
|
||||
|
||||
if json_is_error(res):
|
||||
log.error("Failed to contact {}: replied error '{}'".format(peer_hostport, res['error']))
|
||||
res = None
|
||||
|
||||
if 'stale' in res and res['stale']:
|
||||
# peer is behind the chain tip
|
||||
log.warning("Peer {} reports that it is too far behind the chain tip. Ignoring for now.".format(peer_hostport))
|
||||
res = None
|
||||
|
||||
# update health
|
||||
with AtlasPeerTableLocked(peer_table) as ptbl:
|
||||
@@ -2651,19 +2661,21 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
# test the peer before adding
|
||||
res = atlas_peer_getinfo( peer, timeout=self.ping_timeout, peer_table=peer_table )
|
||||
if res is None:
|
||||
# didn't respond
|
||||
# didn't respond successfully
|
||||
filtered.append(peer)
|
||||
continue
|
||||
|
||||
if not res.has_key('server_version'):
|
||||
# too old
|
||||
# too old. TODO: ban them
|
||||
filtered.append(peer)
|
||||
continue
|
||||
|
||||
if semver_newer( res['server_version'], MIN_ATLAS_VERSION ):
|
||||
# too old to be a valid atlas node
|
||||
if semver_newer(res['server_version'], MIN_ATLAS_VERSION):
|
||||
# too old to be a valid atlas node for this network version
|
||||
filtered.append(peer)
|
||||
log.debug("%s is too old to be an atlas node (version %s)" % (peer, res['server_version']))
|
||||
|
||||
# TODO: ban them
|
||||
continue
|
||||
|
||||
# advance our consensus hashes
|
||||
@@ -2679,6 +2691,8 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
if their_last_block <= our_last_block and res['consensus'] not in self.consensus_hashes.values():
|
||||
# on different consensus rules than us
|
||||
log.debug("Peer {} has ({},{}), but we have ({},{}). Ignoring.".format(peer, their_last_block, res['consensus'], our_last_block, self.consensus_hashes[our_last_block]))
|
||||
|
||||
# TODO: drop them from the peer table, and don't reconnect with them for a while.
|
||||
continue
|
||||
|
||||
if res:
|
||||
@@ -2926,9 +2940,6 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
* Remove at most 10 old, unresponsive peers from the peer DB.
|
||||
"""
|
||||
|
||||
# if os.environ.get("BLOCKSTACK_TEST", None) == "1":
|
||||
# log.debug("%s: %s step" % (self.my_hostport, self.__class__.__name__))
|
||||
|
||||
if path is None:
|
||||
path = self.atlasdb_path
|
||||
|
||||
@@ -3037,9 +3048,6 @@ class AtlasHealthChecker( threading.Thread ):
|
||||
Return True on success
|
||||
Return False on error
|
||||
"""
|
||||
# if os.environ.get("BLOCKSTACK_TEST", None) == "1":
|
||||
# log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
|
||||
|
||||
if path is None:
|
||||
path = self.atlasdb_path
|
||||
|
||||
@@ -3105,12 +3113,10 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
zonefiles that we don't have.
|
||||
"""
|
||||
|
||||
def __init__(self, my_host, my_port, path, zonefile_dir, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[]):
|
||||
def __init__(self, my_host, my_port, path, zonefile_dir):
|
||||
threading.Thread.__init__(self)
|
||||
self.running = False
|
||||
self.hostport = "%s:%s" % (my_host, my_port)
|
||||
self.zonefile_storage_drivers = zonefile_storage_drivers
|
||||
self.zonefile_storage_drivers_write = zonefile_storage_drivers_write
|
||||
self.zonefile_dir = zonefile_dir
|
||||
self.last_storage_reset = time_now()
|
||||
self.atlasdb_path = path
|
||||
@@ -3321,6 +3327,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
with AtlasPeerTableLocked() as ptbl:
|
||||
# if the node didn't actually have these zonefiles, then
|
||||
# update their inventories so we don't ask for them again.
|
||||
# TODO: ban nodes that repeatedly lie to us
|
||||
for zfh in peer_zonefile_hashes:
|
||||
log.debug("%s: %s did not have %s" % (self.hostport, peer_hostport, zfh))
|
||||
atlas_peer_set_zonefile_status( peer_hostport, zfh, False, zonefile_bits=missing_zfinfo[zfh]['indexes'], peer_table=ptbl )
|
||||
@@ -3375,11 +3382,10 @@ class AtlasZonefilePusher(threading.Thread):
|
||||
|
||||
CURRENTLY DEACTIVATED
|
||||
"""
|
||||
def __init__(self, host, port, path, zonefile_dir, zonefile_storage_drivers=None,):
|
||||
def __init__(self, host, port, path, zonefile_dir):
|
||||
threading.Thread.__init__(self)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.zonefile_storage_drivers = zonefile_storage_drivers
|
||||
self.zonefile_dir = zonefile_dir
|
||||
self.hostport = "%s:%s" % (host, port)
|
||||
self.atlasdb_path = path
|
||||
@@ -3416,7 +3422,6 @@ class AtlasZonefilePusher(threading.Thread):
|
||||
return 0
|
||||
|
||||
# it's a valid zonefile. store it.
|
||||
# rc = store_zonefile_data_to_storage( str(zfdata_txt), txid, required=self.zonefile_storage_drivers, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
|
||||
rc = add_atlas_zonefile_data( str(zfdata_txt), self.zonefile_dir )
|
||||
if not rc:
|
||||
log.error("Failed to replicate zonefile %s to external storage" % zonefile_hash)
|
||||
@@ -3463,17 +3468,16 @@ class AtlasZonefilePusher(threading.Thread):
|
||||
|
||||
|
||||
|
||||
def atlas_node_start( my_hostname, my_portnum, atlasdb_path, zonefile_dir, working_dir, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[] ):
|
||||
def atlas_node_start(my_hostname, my_portnum, atlasdb_path, zonefile_dir, working_dir):
|
||||
"""
|
||||
Start up the atlas node.
|
||||
Return a bundle of atlas state
|
||||
"""
|
||||
atlas_state = {}
|
||||
atlas_state['peer_crawler'] = AtlasPeerCrawler( my_hostname, my_portnum, atlasdb_path, working_dir )
|
||||
atlas_state['health_checker'] = AtlasHealthChecker( my_hostname, my_portnum, atlasdb_path )
|
||||
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler( my_hostname, my_portnum, atlasdb_path, zonefile_dir, zonefile_storage_drivers=zonefile_storage_drivers,
|
||||
zonefile_storage_drivers_write=zonefile_storage_drivers_write )
|
||||
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, atlasdb_path, zonefile_dir, zonefile_storage_drivers=zonefile_storage_drivers)
|
||||
atlas_state['peer_crawler'] = AtlasPeerCrawler(my_hostname, my_portnum, atlasdb_path, working_dir)
|
||||
atlas_state['health_checker'] = AtlasHealthChecker(my_hostname, my_portnum, atlasdb_path)
|
||||
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler(my_hostname, my_portnum, atlasdb_path, zonefile_dir)
|
||||
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher(my_hostname, my_portnum, atlasdb_path, zonefile_dir)
|
||||
|
||||
# start them all up
|
||||
for component in atlas_state.keys():
|
||||
|
||||
Reference in New Issue
Block a user