From 15608bcc06a9b549fef3a5a36f07c4e0e6a55f59 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Sun, 12 Mar 2017 22:41:12 -0400 Subject: [PATCH] automatic gc collector --- blockstack/blockstackd.py | 70 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/blockstack/blockstackd.py b/blockstack/blockstackd.py index d25731376..76693d1b9 100644 --- a/blockstack/blockstackd.py +++ b/blockstack/blockstackd.py @@ -46,6 +46,7 @@ import blockstack_zones import keylib import base64 import urllib2 +import gc import jsonschema from jsonschema import ValidationError @@ -82,8 +83,11 @@ from blockstack_client.constants import BLOCKSTACK_TEST bitcoind = None rpc_server = None storage_pusher = None +gc_thread = None has_indexer = True +GC_EVENT_THRESHOLD = 20 + def get_bitcoind( new_bitcoind_opts=None, reset=False, new=False ): """ Get or instantiate our bitcoind client. @@ -270,6 +274,9 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler): proper deserialization. """ def _dispatch(self, method, params): + global gc_thread + gc_thread.gc_event() + try: con_info = { "client_host": self.client_address[0], @@ -1657,6 +1664,34 @@ class BlockstackdRPCServer( threading.Thread, object ): self.rpc_server.shutdown() +class GCThread( threading.Thread ): + """ + Optimistic GC thread + """ + def __init__(self, event_threshold=GC_EVENT_THRESHOLD): + threading.Thread.__init__(self) + self.running = True + self.event_count = 0 + self.event_threshold = event_threshold + + def run(self): + deadline = time.time() + 60 + while self.running: + time.sleep(1.0) + if time.time() > deadline or self.event_count > self.event_threshold: + gc.collect() + deadline = time.time() + 60 + self.event_count = 0 + + + def signal_stop(self): + self.running = False + + + def gc_event(self): + self.event_count += 1 + + class BlockstackStoragePusher( threading.Thread ): """ worker thread to push data into storage providers, @@ -1875,6 +1910,8 @@ class BlockstackStoragePusher( threading.Thread ): """ Push zonefiles and profiles """ + global gc_thread + self.running = True while self.running: @@ -1886,6 +1923,9 @@ class BlockstackStoragePusher( threading.Thread ): time.sleep(1.0) continue + else: + gc_thread.gc_event() + log.debug("StoragePusher thread exit") self.running = False @@ -1939,6 +1979,29 @@ def get_storage_queue_path(): return os.path.join( working_dir, db_filename ) +def gc_start(): + """ + Start a thread to garbage-collect every 30 seconds. + """ + global gc_thread + + gc_thread = GCThread() + log.debug("Optimistic GC thread start") + gc_thread.start() + + +def gc_stop(): + """ + Stop a the optimistic GC thread + """ + global gc_thread + + log.debug("Shutting down GC thread") + gc_thread.signal_stop() + gc_thread.join() + log.debug("GC thread joined") + + def storage_start( blockstack_opts ): """ Start the global data-pusher thread @@ -2286,6 +2349,9 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None # start storage storage_start( blockstack_opts ) + # start GC + gc_start() + # start API server rpc_start(port) set_running( True ) @@ -2332,6 +2398,10 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None log.debug("Stopping storage pusher") storage_stop() + # stopping GC + log.debug("Stopping GC worker") + gc_stop() + # close logfile if logfile is not None: logfile.flush()