mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-05-24 08:09:52 +08:00
automatic gc collector
This commit is contained in:
@@ -46,6 +46,7 @@ import blockstack_zones
|
||||
import keylib
|
||||
import base64
|
||||
import urllib2
|
||||
import gc
|
||||
import jsonschema
|
||||
from jsonschema import ValidationError
|
||||
|
||||
@@ -82,8 +83,11 @@ from blockstack_client.constants import BLOCKSTACK_TEST
|
||||
bitcoind = None
|
||||
rpc_server = None
|
||||
storage_pusher = None
|
||||
gc_thread = None
|
||||
has_indexer = True
|
||||
|
||||
GC_EVENT_THRESHOLD = 20
|
||||
|
||||
def get_bitcoind( new_bitcoind_opts=None, reset=False, new=False ):
|
||||
"""
|
||||
Get or instantiate our bitcoind client.
|
||||
@@ -270,6 +274,9 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
|
||||
proper deserialization.
|
||||
"""
|
||||
def _dispatch(self, method, params):
|
||||
global gc_thread
|
||||
gc_thread.gc_event()
|
||||
|
||||
try:
|
||||
con_info = {
|
||||
"client_host": self.client_address[0],
|
||||
@@ -1657,6 +1664,34 @@ class BlockstackdRPCServer( threading.Thread, object ):
|
||||
self.rpc_server.shutdown()
|
||||
|
||||
|
||||
class GCThread( threading.Thread ):
|
||||
"""
|
||||
Optimistic GC thread
|
||||
"""
|
||||
def __init__(self, event_threshold=GC_EVENT_THRESHOLD):
|
||||
threading.Thread.__init__(self)
|
||||
self.running = True
|
||||
self.event_count = 0
|
||||
self.event_threshold = event_threshold
|
||||
|
||||
def run(self):
|
||||
deadline = time.time() + 60
|
||||
while self.running:
|
||||
time.sleep(1.0)
|
||||
if time.time() > deadline or self.event_count > self.event_threshold:
|
||||
gc.collect()
|
||||
deadline = time.time() + 60
|
||||
self.event_count = 0
|
||||
|
||||
|
||||
def signal_stop(self):
|
||||
self.running = False
|
||||
|
||||
|
||||
def gc_event(self):
|
||||
self.event_count += 1
|
||||
|
||||
|
||||
class BlockstackStoragePusher( threading.Thread ):
|
||||
"""
|
||||
worker thread to push data into storage providers,
|
||||
@@ -1875,6 +1910,8 @@ class BlockstackStoragePusher( threading.Thread ):
|
||||
"""
|
||||
Push zonefiles and profiles
|
||||
"""
|
||||
global gc_thread
|
||||
|
||||
self.running = True
|
||||
while self.running:
|
||||
|
||||
@@ -1886,6 +1923,9 @@ class BlockstackStoragePusher( threading.Thread ):
|
||||
time.sleep(1.0)
|
||||
continue
|
||||
|
||||
else:
|
||||
gc_thread.gc_event()
|
||||
|
||||
log.debug("StoragePusher thread exit")
|
||||
self.running = False
|
||||
|
||||
@@ -1939,6 +1979,29 @@ def get_storage_queue_path():
|
||||
return os.path.join( working_dir, db_filename )
|
||||
|
||||
|
||||
def gc_start():
|
||||
"""
|
||||
Start a thread to garbage-collect every 30 seconds.
|
||||
"""
|
||||
global gc_thread
|
||||
|
||||
gc_thread = GCThread()
|
||||
log.debug("Optimistic GC thread start")
|
||||
gc_thread.start()
|
||||
|
||||
|
||||
def gc_stop():
|
||||
"""
|
||||
Stop a the optimistic GC thread
|
||||
"""
|
||||
global gc_thread
|
||||
|
||||
log.debug("Shutting down GC thread")
|
||||
gc_thread.signal_stop()
|
||||
gc_thread.join()
|
||||
log.debug("GC thread joined")
|
||||
|
||||
|
||||
def storage_start( blockstack_opts ):
|
||||
"""
|
||||
Start the global data-pusher thread
|
||||
@@ -2286,6 +2349,9 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None
|
||||
# start storage
|
||||
storage_start( blockstack_opts )
|
||||
|
||||
# start GC
|
||||
gc_start()
|
||||
|
||||
# start API server
|
||||
rpc_start(port)
|
||||
set_running( True )
|
||||
@@ -2332,6 +2398,10 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None
|
||||
log.debug("Stopping storage pusher")
|
||||
storage_stop()
|
||||
|
||||
# stopping GC
|
||||
log.debug("Stopping GC worker")
|
||||
gc_stop()
|
||||
|
||||
# close logfile
|
||||
if logfile is not None:
|
||||
logfile.flush()
|
||||
|
||||
Reference in New Issue
Block a user