mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-24 03:45:38 +08:00
remove the global atlasdb lock and rely instead on exponential back offs on lock contention. let sqlite3 handle locking
This commit is contained in:
@@ -35,7 +35,7 @@ import socket
|
||||
import gc
|
||||
|
||||
import virtualchain
|
||||
from nameset.virtualchain_hooks import get_last_block, get_snapshots
|
||||
from nameset.virtualchain_hooks import get_last_block, get_snapshots, get_valid_transaction_window
|
||||
|
||||
from .util import url_to_host_port, atlas_inventory_to_string
|
||||
from .storage.auth import get_zonefile_data_hash
|
||||
@@ -547,29 +547,34 @@ def atlasdb_format_query( query, values ):
|
||||
return "".join( ["%s %s" % (frag, "'%s'" % val if type(val) in [str, unicode] else val) for (frag, val) in zip(query.split("?"), values + ("",))] )
|
||||
|
||||
|
||||
|
||||
def atlasdb_query_execute( cur, query, values ):
|
||||
def atlasdb_query_execute(cur, query, values):
|
||||
"""
|
||||
Execute a query. If it fails, exit.
|
||||
|
||||
DO NOT CALL THIS DIRECTLY.
|
||||
Execute a query.
|
||||
Handle db timeouts.
|
||||
Abort on failure.
|
||||
"""
|
||||
timeout = 1.0
|
||||
while True:
|
||||
try:
|
||||
ret = cur.execute(query, values)
|
||||
return ret
|
||||
except sqlite3.OperationalError as oe:
|
||||
if oe.message == "database is locked":
|
||||
timeout = timeout * 2 + timeout * random.random()
|
||||
log.error("Query timed out due to lock; retrying in %s: %s" % (timeout, atlasdb_format_query(query, values)))
|
||||
time.sleep(timeout)
|
||||
|
||||
else:
|
||||
log.exception(oe)
|
||||
log.error("FATAL: failed to execute query (%s, %s)" % (query, values))
|
||||
log.error("\n".join(traceback.format_stack()))
|
||||
os.abort()
|
||||
|
||||
# under heavy contention, this can cause timeouts (which is unacceptable)
|
||||
# serialize access to the db just to be safe
|
||||
|
||||
global DB_LOCK
|
||||
|
||||
try:
|
||||
DB_LOCK.acquire()
|
||||
ret = cur.execute( query, values )
|
||||
DB_LOCK.release()
|
||||
return ret
|
||||
except Exception, e:
|
||||
log.exception(e)
|
||||
log.error("FATAL: failed to execute query (%s, %s)" % (query, values))
|
||||
log.error("\n" + "\n".join(traceback.format_stack()))
|
||||
os.abort()
|
||||
except Exception, e:
|
||||
log.exception(e)
|
||||
log.error("FATAL: failed to execute query (%s, %s)" % (query, values))
|
||||
log.error("\n".join(traceback.format_stack()))
|
||||
os.abort()
|
||||
|
||||
|
||||
def atlasdb_open( path ):
|
||||
@@ -2557,10 +2562,11 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
|
||||
self.neighbors_timeout = None
|
||||
self.ping_timeout = None
|
||||
|
||||
self.consensus_hashes = {}
|
||||
self.working_dir = working_dir
|
||||
|
||||
last_block = get_last_block(self.working_dir)
|
||||
self.consensus_hashes = get_snapshots(self.working_dir, start_block=last_block-get_valid_transaction_window(), end_block=last_block)
|
||||
|
||||
|
||||
def canonical_peer( self, peer ):
|
||||
"""
|
||||
@@ -2654,7 +2660,8 @@ class AtlasPeerCrawler( threading.Thread ):
|
||||
filtered.append(peer)
|
||||
log.debug("%s is too old to be an atlas node (version %s)" % (peer, res['server_version']))
|
||||
continue
|
||||
|
||||
|
||||
# advance our consensus hashes
|
||||
our_last_block = get_last_block(self.working_dir)
|
||||
if not self.consensus_hashes.has_key(our_last_block):
|
||||
cur_last_block = max(self.consensus_hashes.keys())
|
||||
|
||||
Reference in New Issue
Block a user