all methods: expect either an atlas database connection or an atlas database path. All atlas components: require a virtualchain working dir and a path to the atlas db

This commit is contained in:
Jude Nelson
2018-01-12 18:22:06 -05:00
parent 1dfdb5ade6
commit b8e6425f47

View File

@@ -37,10 +37,11 @@ import gc
import virtualchain
from nameset.virtualchain_hooks import get_last_block, get_snapshots
from blockstack_client.config import semver_newer
from blockstack_client.utils import url_to_host_port, atlas_inventory_to_string
from .util import url_to_host_port, atlas_inventory_to_string
from .storage.auth import get_zonefile_data_hash
from blockstack_client.proxy import \
from .client import \
BlockstackRPCClient, \
ping as blockstack_ping, \
getinfo as blockstack_getinfo, \
get_zonefile_inventory as blockstack_get_zonefile_inventory, \
@@ -94,22 +95,18 @@ if os.environ.get("BLOCKSTACK_ATLAS_MIN_PEER_HEALTH") is not None:
if os.environ.get("BLOCKSTACK_ATLAS_NUM_NEIGHBORS") is not None:
NUM_NEIGHBORS = int(os.environ.get("BLOCKSTACK_ATLAS_NUM_NEIGHBORS"))
if os.environ.get("BLOCKSTACK_TEST", None) == "1":
if BLOCKSTACK_TEST:
PEER_CRAWL_NEIGHBOR_WORK_INTERVAL = 1
PEER_HEALTH_NEIGHBOR_WORK_INTERVAL = 1
PEER_CRAWL_ZONEFILE_WORK_INTERVAL = 1
PEER_PUSH_ZONEFILE_WORK_INTERVAL = 1
ATLAS_TEST = False
if os.environ.get("BLOCKSTACK_TEST", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION_PEER", None) == "1":
if BLOCKSTACK_TEST and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1" and os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION_PEER", None) == "1":
# subordinate atlas peer in the simulator.
# use test client
ATLAS_TEST = True
else:
# production
from blockstack_client import BlockstackRPCClient
def time_now():
global ATLAS_TEST
if ATLAS_TEST:
@@ -325,9 +322,7 @@ class AtlasDBOpen(object):
context manager for opening the atlas database
"""
def __init__(self, con=None, path=None):
if not path:
path = atlasdb_path()
assert con or path, 'Either a path or db connection is required'
self.con = con
self.path = path
self.opened = False
@@ -544,14 +539,6 @@ def atlasdb_row_factory( cursor, row ):
return d
def atlasdb_path( impl=None ):
"""
Get the path to the atlas DB
"""
working_dir = virtualchain.get_working_dir(impl=impl)
return os.path.join(working_dir, "atlas.db")
def atlasdb_format_query( query, values ):
"""
Turn a query into a string for printing.
@@ -608,9 +595,6 @@ def atlasdb_add_zonefile_info( name, zonefile_hash, txid, present, tried_storage
"""
global ZONEFILE_INV, NUM_ZONEFILES
if path is None:
path = atlasdb_path()
with AtlasDBOpen( con=con, path=path ) as dbcon:
if present:
present = 1
@@ -658,9 +642,6 @@ def atlasdb_get_lastblock( con=None, path=None ):
"""
Get the highest block height in the atlas db
"""
if path is None:
path = atlasdb_path()
row = None
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -685,9 +666,6 @@ def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
Look up all information on this zonefile.
Returns {'zonefile_hash': ..., 'indexes': [...], etc}
"""
if path is None:
path = atlasdb_path()
ret = None
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -720,9 +698,6 @@ def atlasdb_get_zonefiles_by_block( from_block, to_block, offset, count, con=Non
Look up all information on this zonefile.
Returns {'zonefile_hash': ..., 'indexes': [...], etc}
"""
if path is None:
path = atlasdb_path()
ret = None
if count > 100:
@@ -758,9 +733,6 @@ def atlasdb_find_zonefile_by_txid( txid, con=None, path=None ):
Returns {'zonefile_hash': ..., 'name': ..., etc.}
Returns None if not found
"""
if path is None:
path = atlasdb_path()
ret = None
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -787,9 +759,6 @@ def atlasdb_set_zonefile_present( zonefile_hash, present, con=None, path=None ):
"""
global ZONEFILE_INV
if path is None:
path = atlasdb_path()
was_present = None
with AtlasDBOpen(con=con, path=path) as dbcon:
if present:
@@ -827,9 +796,6 @@ def atlasdb_set_zonefile_tried_storage( zonefile_hash, tried_storage, con=None,
"""
global ZONEFILE_INV
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
if tried_storage:
tried_storage = 1
@@ -851,9 +817,6 @@ def atlasdb_reset_zonefile_tried_storage( con=None, path=None ):
For zonefiles that we don't have, re-attempt to fetch them from storage.
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "UPDATE zonefiles SET tried_storage = ? WHERE present = ?;"
@@ -885,9 +848,6 @@ def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
What bit(s) in a zonefile inventory does a zonefile hash correspond to?
Return their indexes in the bit field.
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT inv_index FROM zonefiles WHERE zonefile_hash = ?;"
@@ -905,14 +865,14 @@ def atlasdb_get_zonefile_bits( zonefile_hash, con=None, path=None ):
return ret
def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=True ):
def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir, validate=True ):
"""
Queue all zonefile hashes in the BlockstackDB
to the zonefile queue
"""
# populate zonefile queue
total = 0
for block_height in xrange(start_block, db.lastblock+1, 1):
for block_height in range(start_block, db.lastblock+1, 1):
zonefile_info = db.get_atlas_zonefile_info_at( block_height )
for name_txid_zfhash in zonefile_info:
@@ -921,7 +881,7 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
txid = str(name_txid_zfhash['txid'])
tried_storage = 0
present = is_zonefile_cached( zfhash, zonefile_dir=zonefile_dir, validate=validate )
present = is_zonefile_cached( zfhash, zonefile_dir, validate=validate )
zfinfo = atlasdb_get_zonefile( zfhash, con=con )
if zfinfo is not None:
tried_storage = zfinfo['tried_storage']
@@ -934,15 +894,12 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
return True
def atlasdb_sync_zonefiles( db, start_block, zonefile_dir=None, validate=True, path=None, con=None ):
def atlasdb_sync_zonefiles( db, start_block, zonefile_dir, validate=True, path=None, con=None ):
"""
Synchronize atlas DB with name db
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
atlasdb_queue_zonefiles( dbcon, db, start_block, zonefile_dir=zonefile_dir, validate=validate )
atlasdb_queue_zonefiles( dbcon, db, start_block, zonefile_dir, validate=validate )
atlasdb_cache_zonefile_info( con=dbcon )
return True
@@ -970,9 +927,6 @@ def atlasdb_add_peer( peer_hostport, discovery_time=None, peer_table=None, con=N
peer_slot = int( hashlib.sha256("%s%s" % (sk, peer_host)).hexdigest(), 16 ) % PEER_MAX_DB
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
if discovery_time is None:
@@ -1037,10 +991,6 @@ def atlasdb_remove_peer( peer_hostport, con=None, path=None, peer_table=None ):
"""
Remove a peer from the peer db and (if given) peer table.
"""
if path is None:
path = atlasdb_path()
# remove from db
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -1067,9 +1017,6 @@ def atlasdb_num_peers( con=None, path=None ):
"""
How many peers are there in the db?
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT MAX(peer_index) FROM peers;"
@@ -1107,9 +1054,6 @@ def atlasdb_get_random_peer( con=None, path=None ):
Select a peer from the db at random
Return None if the table is empty
"""
if path is None:
path = atlasdb_path()
ret = {}
with AtlasDBOpen(con=con, path=path) as dbcon:
@@ -1142,9 +1086,6 @@ def atlasdb_get_old_peers( now, con=None, path=None ):
"""
Get peers older than now - PEER_LIFETIME
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
if now is None:
@@ -1171,9 +1112,6 @@ def atlasdb_renew_peer( peer_hostport, now, con=None, path=None ):
"""
Renew a peer's discovery time
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
if now is None:
now = time.time()
@@ -1194,9 +1132,6 @@ def atlasdb_load_peer_table( con=None, path=None ):
"""
peer_table = {}
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT * FROM peers;"
@@ -1218,7 +1153,7 @@ def atlasdb_load_peer_table( con=None, path=None ):
return peer_table
def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile_dir=None ):
def atlasdb_init( path, zonefile_dir, db, peer_seeds, peer_blacklist, validate=False):
"""
Set up the atlas node:
* create the db if it doesn't exist
@@ -1246,7 +1181,7 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
log.debug("Synchronize zonefiles from %s to %s" % (atlasdb_last_block, db.lastblock) )
atlasdb_queue_zonefiles( con, db, atlasdb_last_block, validate=validate, zonefile_dir=zonefile_dir )
atlasdb_queue_zonefiles( con, db, atlasdb_last_block, zonefile_dir, validate=validate)
log.debug("Refreshing seed peers")
for peer in peer_seeds:
@@ -1278,7 +1213,7 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
# populate from db
log.debug("Queuing all zonefiles")
atlasdb_queue_zonefiles( con, db, FIRST_BLOCK_MAINNET, validate=validate, zonefile_dir=zonefile_dir )
atlasdb_queue_zonefiles( con, db, FIRST_BLOCK_MAINNET, zonefile_dir, validate=validate)
log.debug("Adding seed peers")
for peer in peer_seeds:
@@ -1288,6 +1223,7 @@ def atlasdb_init( path, db, peer_seeds, peer_blacklist, validate=False, zonefile
con.close()
log.debug("peer_table: {}".format(peer_table.keys()))
# whitelist and blacklist
for peer_url in peer_seeds:
host, port = url_to_host_port( peer_url )
@@ -1329,9 +1265,6 @@ def atlasdb_zonefile_inv_list( bit_offset, bit_length, con=None, path=None ):
Return the list of zonefile information.
The list may be less than length elements.
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT * FROM zonefiles LIMIT ? OFFSET ?;"
@@ -1354,9 +1287,6 @@ def atlasdb_zonefile_inv_length( con=None, path=None ):
"""
Find out how long our zonefile inventory vector is (in bits)
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT MAX(inv_index) FROM zonefiles;"
@@ -1395,9 +1325,6 @@ def atlasdb_zonefile_find_missing( bit_offset, bit_count, con=None, path=None ):
offset and count are *bit* indexes
Return a list of zonefile rows, where present == 0.
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT * FROM zonefiles WHERE present = 0 LIMIT ? OFFSET ?;"
@@ -1422,9 +1349,6 @@ def atlasdb_zonefile_find_present( bit_offset, bit_count, con=None, path=None ):
offset and count are *bit* indexes
Return a list of zonefile rows, where present == 0.
"""
if path is None:
path = atlasdb_path()
with AtlasDBOpen(con=con, path=path) as dbcon:
sql = "SELECT * FROM zonefiles WHERE present = 0 LIMIT ? OFFSET ?;"
@@ -2347,21 +2271,6 @@ def atlas_get_zonefiles( my_hostport, peer_hostport, zonefile_hashes, timeout=No
return zonefile_datas
def atlas_get_zonefile_data_from_storage( name, zonefile_hash, storage_drivers ):
"""
Go get a zonefile from storage drivers
"""
try:
res = get_zonefile_data_from_storage( name, zonefile_hash, drivers=storage_drivers )
return {'status': True, 'zonefile_data': res}
except Exception, e:
if os.environ.get("BLOCKSTACK_TEST", None) == "1":
log.exception(e)
# if this fails, but zonefile data was retrieved, it's probably because they were legacy zonefiles
return {'error': 'Failed to get zonefile %s from storage' % zonefile_hash}
def atlas_rank_peers_by_health( peer_list=None, peer_table=None, with_zero_requests=False, with_rank=False ):
"""
Get a ranking of peers to contact for a zonefile.
@@ -2556,7 +2465,7 @@ def atlas_zonefile_push( my_hostport, peer_hostport, zonefile_data, timeout=None
if timeout is None:
timeout = atlas_push_zonefiles_timeout()
zonefile_hash = blockstack_client.get_zonefile_data_hash(zonefile_data)
zonefile_hash = get_zonefile_data_hash(zonefile_data)
zonefile_data_b64 = base64.b64encode( zonefile_data )
host, port = url_to_host_port( peer_hostport )
@@ -2627,7 +2536,7 @@ class AtlasPeerCrawler( threading.Thread ):
and we restart from a randomly-chosen peer in our peer DB.
"""
def __init__(self, my_hostname, my_portnum, path=None ):
def __init__(self, my_hostname, my_portnum, path, working_dir):
threading.Thread.__init__(self)
self.running = False
self.last_clean_time = 0
@@ -2651,6 +2560,7 @@ class AtlasPeerCrawler( threading.Thread ):
self.ping_timeout = None
self.consensus_hashes = {}
self.working_dir = working_dir
def canonical_peer( self, peer ):
@@ -2670,6 +2580,8 @@ class AtlasPeerCrawler( threading.Thread ):
Get neighbors of this peer
NOTE: don't lock peer table in production
"""
if path is None:
path = self.atlasdb_path
if self.neighbors_timeout is None:
self.neighbors_timeout = atlas_neighbors_timeout()
@@ -2702,6 +2614,9 @@ class AtlasPeerCrawler( threading.Thread ):
if self.ping_timeout is None:
self.ping_timeout = atlas_ping_timeout()
if path is None:
path = self.atlasdb_path
# only handle a few peers for now
cnt = 0
i = 0
@@ -2741,11 +2656,11 @@ class AtlasPeerCrawler( threading.Thread ):
log.debug("%s is too old to be an atlas node (version %s)" % (peer, res['server_version']))
continue
our_last_block = get_last_block()
our_last_block = get_last_block(self.working_dir)
if not self.consensus_hashes.has_key(our_last_block):
consensus_hashes = get_snapshots()
if consensus_hashes:
self.consensus_hashes = consensus_hashes
cur_last_block = max(self.consensus_hashes.keys())
new_consensus_hashes = get_snapshots(self.working_dir, start_block=cur_last_block, end_block=our_last_block+1)
self.consensus_hashes.update(new_consensus_hashes)
if self.consensus_hashes.has_key(our_last_block):
@@ -2771,6 +2686,9 @@ class AtlasPeerCrawler( threading.Thread ):
Return the list of peers we removed
"""
if path is None:
path = self.atlasdb_path
removed = []
rank_peer_list = atlas_rank_peers_by_health( peer_table=peer_table, with_rank=True )
for rank, peer in rank_peer_list:
@@ -2801,6 +2719,9 @@ class AtlasPeerCrawler( threading.Thread ):
Return the next peer.
"""
if path is None:
path = self.atlasdb_path
# the "next" current peer
ret_current_peer = None
ret_current_peer_neighbors = None
@@ -2916,6 +2837,9 @@ class AtlasPeerCrawler( threading.Thread ):
Return the number of peers processed
"""
if path is None:
path = self.atlasdb_path
# add newly-discovered peers, but only after we ping them
# to make sure they're actually alive.
peer_queue = atlas_peer_dequeue_all( peer_queue=peer_queue )
@@ -2951,6 +2875,9 @@ class AtlasPeerCrawler( threading.Thread ):
Return the number of peers removed
"""
if path is None:
path = self.atlasdb_path
# remove peers that are too old
if self.last_clean_time + atlas_peer_clean_interval() < time_now():
# remove stale peers
@@ -2991,6 +2918,9 @@ class AtlasPeerCrawler( threading.Thread ):
# if os.environ.get("BLOCKSTACK_TEST", None) == "1":
# log.debug("%s: %s step" % (self.my_hostport, self.__class__.__name__))
if path is None:
path = self.atlasdb_path
if self.max_neighbors is None:
self.max_neighbors = atlas_max_neighbors()
log.debug("%s: max neighbors is %s" % (self.my_hostport, self.max_neighbors))
@@ -3080,15 +3010,12 @@ class AtlasHealthChecker( threading.Thread ):
Also finds unhealthy or old peers and removes them
from the peer table and peer db.
"""
def __init__(self, my_host, my_port, path=None):
def __init__(self, my_host, my_port, path):
threading.Thread.__init__(self)
self.running = False
self.path = path
self.atlasdb_path = path
self.hostport = "%s:%s" % (my_host, my_port)
self.last_clean_time = 0
if path is None:
path = atlasdb_path()
self.atlasdb_path = path
@@ -3104,7 +3031,7 @@ class AtlasHealthChecker( threading.Thread ):
# log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if path is None:
path = self.path
path = self.atlasdb_path
peer_hostports = []
stale_peers = []
@@ -3168,17 +3095,15 @@ class AtlasZonefileCrawler( threading.Thread ):
zonefiles that we don't have.
"""
def __init__(self, my_host, my_port, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[], path=None, zonefile_dir=None):
def __init__(self, my_host, my_port, path, zonefile_dir, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[]):
threading.Thread.__init__(self)
self.running = False
self.hostport = "%s:%s" % (my_host, my_port)
self.path = path
self.zonefile_storage_drivers = zonefile_storage_drivers
self.zonefile_storage_drivers_write = zonefile_storage_drivers_write
self.zonefile_dir = zonefile_dir
self.last_storage_reset = time_now()
if self.path is None:
self.path = atlasdb_path()
self.atlasdb_path = path
def store_zonefile_data( self, fetched_zfhash, txid, zonefile_data, peer_hostport, con, path ):
@@ -3188,7 +3113,8 @@ class AtlasZonefileCrawler( threading.Thread ):
Return True on success
Return False on error
"""
rc = store_zonefile_data_to_storage( zonefile_data, txid, required=self.zonefile_storage_drivers_write, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
# rc = store_zonefile_data_to_storage( zonefile_data, txid, required=self.zonefile_storage_drivers_write, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
rc = add_atlas_zonefile_data( zonefile_data, zonefile_dir )
if not rc:
log.error("%s: Failed to store zonefile %s" % (self.hostport, fetched_zfhash))
@@ -3245,37 +3171,6 @@ class AtlasZonefileCrawler( threading.Thread ):
return ret
def try_crawl_storage( self, name, zfhash, txid, path, con=None ):
"""
Try to get a zonefile from storage
Record in the DB that we tried.
Return True on success
Return False if not
"""
rc = None
with AtlasDBOpen(con=con, path=path) as dbcon:
# is this zonefile available via storage?
log.debug("Try loading %s from storage" % zfhash)
zonefile_info = atlas_get_zonefile_data_from_storage( name, zfhash, self.zonefile_storage_drivers )
# tried loading from storage
atlasdb_set_zonefile_tried_storage( zfhash, True, con=dbcon, path=path )
if 'error' in zonefile_info:
log.error("%s: Failed to get zonefile '%s' from storage" % (self.hostport, zfhash))
rc = False
else:
# got it! remember it
log.debug("%s: got %s from storage" % (self.hostport, zfhash))
rc = self.store_zonefile_data( zfhash, txid, zonefile_info['zonefile_data'], "storage", dbcon, path )
return rc
def find_zonefile_origins( self, missing_zfinfo, peer_hostports ):
"""
Find out which peers can serve which zonefiles
@@ -3313,7 +3208,7 @@ class AtlasZonefileCrawler( threading.Thread ):
# log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if path is None:
path = self.path
path = self.atlasdb_path
num_fetched = 0
missing_zinfo = None
@@ -3335,13 +3230,13 @@ class AtlasZonefileCrawler( threading.Thread ):
for i in xrange(0, len(zonefile_hashes)):
# is this zonefile already cached?
zfhash = zonefile_hashes[i]
present = is_zonefile_cached( zfhash, zonefile_dir=self.zonefile_dir, validate=True )
present = is_zonefile_cached( zfhash, self.zonefile_dir, validate=True )
if present:
log.debug("%s: zonefile %s already cached. Marking present" % (self.hostport, zfhash))
zonefile_hashes[i] = None
# mark it as present
res = atlasdb_set_zonefile_present( zfhash, True, path=self.path )
res = atlasdb_set_zonefile_present( zfhash, True, path=path )
zonefile_hashes = filter( lambda zfh: zfh is not None, zonefile_hashes )
@@ -3362,21 +3257,6 @@ class AtlasZonefileCrawler( threading.Thread ):
log.warn("%s: unknown zonefile %s" % (self.hostport, zfhash))
zfname = zfinfo['name']
# is this zonefile available via storage?
if not missing_zfinfo[zfhash]['tried_storage']:
# this can be somewhat memory-intensive, so
# invoke the gc immediately afterwards
rc = self.try_crawl_storage( zfname, zfhash, zftxid, path )
gc.collect(2)
if rc:
# don't ask for it again
zonefile_hashes.pop(0)
num_fetched += 1
continue
if len(peers) == 0:
# unavailable
if not missing_zfinfo[zfhash]['tried_storage']:
@@ -3455,7 +3335,7 @@ class AtlasZonefileCrawler( threading.Thread ):
while self.running:
t1 = time.time()
num_fetched = self.step( path=self.path )
num_fetched = self.step( path=self.atlasdb_path )
t2 = time.time()
if num_fetched == 0 and t2 - t1 < PEER_CRAWL_ZONEFILE_WORK_INTERVAL:
@@ -3486,17 +3366,14 @@ class AtlasZonefilePusher(threading.Thread):
CURRENTLY DEACTIVATED
"""
def __init__(self, host, port, zonefile_storage_drivers=None, zonefile_dir=None, path=None ):
def __init__(self, host, port, path, zonefile_dir, zonefile_storage_drivers=None,):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.zonefile_storage_drivers = zonefile_storage_drivers
self.zonefile_dir = zonefile_dir
self.hostport = "%s:%s" % (host, port)
self.path = path
if self.path is None:
self.path = atlasdb_path()
self.atlasdb_path = path
self.push_timeout = None
@@ -3506,8 +3383,10 @@ class AtlasZonefilePusher(threading.Thread):
Push the zonefile to all the peers that need it.
Return the number of peers we sent to
"""
if os.environ.get("BLOCKSTACK_TEST", None) == "1":
if path is None:
path = self.atlasdb_path
if BLOCKSTACK_TEST:
log.debug("%s: %s step" % (self.hostport, self.__class__.__name__))
if self.push_timeout is None:
@@ -3527,8 +3406,9 @@ class AtlasZonefilePusher(threading.Thread):
# not recognized
return 0
# it's a valid zonefile. cache and store it.
rc = store_zonefile_data_to_storage( str(zfdata_txt), txid, required=self.zonefile_storage_drivers, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
# it's a valid zonefile. store it.
# rc = store_zonefile_data_to_storage( str(zfdata_txt), txid, required=self.zonefile_storage_drivers, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
rc = add_atlas_zonefile_data( str(zfdata_txt), self.zonefile_dir )
if not rc:
log.error("Failed to replicate zonefile %s to external storage" % zonefile_hash)
@@ -3557,7 +3437,7 @@ class AtlasZonefilePusher(threading.Thread):
self.running = True
while self.running:
t1 = time_now()
num_pushed = self.step( path=self.path )
num_pushed = self.step( path=self.atlasdb_path )
t2 = time_now()
if num_pushed == 0 and t2 - t1 < PEER_PUSH_ZONEFILE_WORK_INTERVAL:
@@ -3574,17 +3454,17 @@ class AtlasZonefilePusher(threading.Thread):
def atlas_node_start( my_hostname, my_portnum, atlasdb_path=None, zonefile_dir=None, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[] ):
def atlas_node_start( my_hostname, my_portnum, atlasdb_path, zonefile_dir, working_dir, zonefile_storage_drivers=[], zonefile_storage_drivers_write=[] ):
"""
Start up the atlas node.
Return a bundle of atlas state
"""
atlas_state = {}
atlas_state['peer_crawler'] = AtlasPeerCrawler( my_hostname, my_portnum )
atlas_state['health_checker'] = AtlasHealthChecker( my_hostname, my_portnum, path=atlasdb_path )
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler( my_hostname, my_portnum, zonefile_storage_drivers=zonefile_storage_drivers,
zonefile_storage_drivers_write=zonefile_storage_drivers_write, path=atlasdb_path, zonefile_dir=zonefile_dir )
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, path=atlasdb_path, zonefile_storage_drivers=zonefile_storage_drivers, zonefile_dir=zonefile_dir )
atlas_state['peer_crawler'] = AtlasPeerCrawler( my_hostname, my_portnum, atlasdb_path, working_dir )
atlas_state['health_checker'] = AtlasHealthChecker( my_hostname, my_portnum, atlasdb_path )
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler( my_hostname, my_portnum, atlasdb_path, zonefile_dir, zonefile_storage_drivers=zonefile_storage_drivers,
zonefile_storage_drivers_write=zonefile_storage_drivers_write )
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, atlasdb_path, zonefile_dir, zonefile_storage_drivers=zonefile_storage_drivers)
# start them all up
for component in atlas_state.keys():