mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-24 03:45:38 +08:00
when waiting, check every second for thread shutdown request
This commit is contained in:
@@ -461,13 +461,21 @@ def atlasdb_add_zonefile_info( name, zonefile_hash, txid, present, block_height,
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
sql = "INSERT INTO zonefiles (name, zonefile_hash, txid, present, tried_storage, block_height) VALUES (?,?,?,?,?,?);"
|
||||
args = (name, zonefile_hash, txid, present, 0, block_height)
|
||||
sql = "UPDATE zonefiles SET name = ?, zonefile_hash = ?, txid = ?, present = ?, tried_storage = ?, block_height = ? WHERE txid = ?;"
|
||||
args = (name, zonefile_hash, txid, present, 0, block_height, txid )
|
||||
|
||||
cur = con.cursor()
|
||||
atlasdb_query_execute( cur, sql, args )
|
||||
update_res = atlasdb_query_execute( cur, sql, args )
|
||||
con.commit()
|
||||
|
||||
if update_res.rowcount == 0:
|
||||
sql = "INSERT OR IGNORE INTO zonefiles (name, zonefile_hash, txid, present, tried_storage, block_height) VALUES (?,?,?,?,?,?);"
|
||||
args = (name, zonefile_hash, txid, present, 0, block_height)
|
||||
|
||||
cur = con.cursor()
|
||||
atlasdb_query_execute( cur, sql, args )
|
||||
con.commit()
|
||||
|
||||
# keep in-RAM zonefile inv coherent
|
||||
zfbits = atlasdb_get_zonefile_bits( zonefile_hash, con=con, path=path )
|
||||
|
||||
@@ -3251,7 +3259,12 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
|
||||
if num_added == 0 and num_removed == 0 and t2 - t1 < PEER_CRAWL_NEIGHBOR_WORK_INTERVAL:
|
||||
# take a break
|
||||
time_sleep( self.my_hostport, self.__class__.__name__, PEER_CRAWL_NEIGHBOR_WORK_INTERVAL - (t2 - t1) )
|
||||
deadline = time_now() + PEER_CRAWL_NEIGHBOR_WORK_INTERVAL - (t2 - t1)
|
||||
while time_now() < deadline and self.running:
|
||||
time_sleep( self.my_hostport, self.__class__.__name__, 1.0 )
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
|
||||
def ask_join(self):
|
||||
@@ -3340,7 +3353,12 @@ class AtlasHealthChecker( threading.Thread ):
|
||||
|
||||
# don't go too fast
|
||||
if t2 - t1 < PEER_HEALTH_NEIGHBOR_WORK_INTERVAL:
|
||||
time_sleep( self.hostport, self.__class__.__name__, PEER_HEALTH_NEIGHBOR_WORK_INTERVAL - (t2 - t1) )
|
||||
deadline = time_now() + PEER_HEALTH_NEIGHBOR_WORK_INTERVAL - (t2 - t1)
|
||||
while time_now() < deadline and self.running:
|
||||
time_sleep( self.hostport, self.__class__.__name__, 1.0 )
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
|
||||
def ask_join(self):
|
||||
@@ -3650,7 +3668,12 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
t2 = time.time()
|
||||
|
||||
if num_fetched == 0 and t2 - t1 < PEER_CRAWL_ZONEFILE_WORK_INTERVAL:
|
||||
time_sleep( self.hostport, self.__class__.__name__, PEER_CRAWL_ZONEFILE_WORK_INTERVAL - (t2 - t1) )
|
||||
deadline = time_now() + PEER_CRAWL_ZONEFILE_WORK_INTERVAL - (t2 - t1)
|
||||
while time_now() < deadline and self.running:
|
||||
time_sleep( self.hostport, self.__class__.__name__, 1.0 )
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
# re-try storage periodically for missing zonefiles
|
||||
if self.last_storage_reset + PEER_CRAWL_ZONEFILE_STORAGE_RETRY_INTERVAL < time_now():
|
||||
@@ -3749,9 +3772,14 @@ class AtlasZonefilePusher(threading.Thread):
|
||||
num_pushed = self.step( path=self.path )
|
||||
t2 = time_now()
|
||||
if num_pushed == 0 and t2 - t1 < PEER_PUSH_ZONEFILE_WORK_INTERVAL:
|
||||
time_sleep(self.hostport, self.__class__.__name__, PEER_PUSH_ZONEFILE_WORK_INTERVAL - (t2 - t1))
|
||||
pass
|
||||
|
||||
|
||||
deadline = time_now() + PEER_PUSH_ZONEFILE_WORK_INTERVAL - (t2 - t1)
|
||||
while time_now() < deadline and self.running:
|
||||
time_sleep( self.my_hostport, self.__class__.__name__, 1.0 )
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
|
||||
def ask_join(self):
|
||||
self.running = False
|
||||
@@ -3767,7 +3795,7 @@ def atlas_node_start( my_hostname, my_portnum, atlasdb_path=None, zonefile_dir=N
|
||||
atlas_state['peer_crawler'] = AtlasPeerCrawler( my_hostname, my_portnum )
|
||||
atlas_state['health_checker'] = AtlasHealthChecker( my_hostname, my_portnum, path=atlasdb_path )
|
||||
atlas_state['zonefile_crawler'] = AtlasZonefileCrawler( my_hostname, my_portnum, zonefile_storage_drivers=zonefile_storage_drivers, path=atlasdb_path, zonefile_dir=zonefile_dir )
|
||||
atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, path=atlasdb_path )
|
||||
# atlas_state['zonefile_pusher'] = AtlasZonefilePusher( my_hostname, my_portnum, path=atlasdb_path )
|
||||
|
||||
# start them all up
|
||||
for component in atlas_state.keys():
|
||||
|
||||
Reference in New Issue
Block a user