mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-21 09:55:29 +08:00
add a bounded threading mixin to the RPC server that allows it to have up to a maximum number of outstanding threads handling requests. serialize all RPC access so we don't introduce database contention or corruption. make the maximum number of RPC threads overridable in the environment.
This commit is contained in:
@@ -45,6 +45,7 @@ from jsonschema import ValidationError
|
||||
|
||||
import xmlrpclib
|
||||
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
|
||||
import SocketServer
|
||||
|
||||
# stop common XML attacks
|
||||
from defusedxml import xmlrpc
|
||||
@@ -394,7 +395,16 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
|
||||
else:
|
||||
log.debug("RPC %s(%s) begin from %s" % ("rpc_" + str(method), params_fmt, self.client_address[0]))
|
||||
|
||||
res = self.server.funcs["rpc_" + str(method)](*params, **con_info)
|
||||
res = None
|
||||
with self.server.rpc_guard:
|
||||
# RPC calls should be sequential to ensure database integrity
|
||||
if BLOCKSTACK_TEST:
|
||||
log.debug('RPC thread enter {}'.format(threading.current_thread().ident))
|
||||
|
||||
res = self.server.funcs["rpc_" + str(method)](*params, **con_info)
|
||||
|
||||
if BLOCKSTACK_TEST:
|
||||
log.debug('RPC thread exit {}'.format(threading.current_thread().ident))
|
||||
|
||||
if 'deprecated' in res and res['deprecated']:
|
||||
log.warn("DEPRECATED method call {} from {}".format(method, self.client_address[0]))
|
||||
@@ -413,7 +423,93 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
|
||||
return json.dumps(rpc_traceback())
|
||||
|
||||
|
||||
class BlockstackdRPC(SimpleXMLRPCServer):
|
||||
class BoundedThreadingMixIn:
|
||||
"""
|
||||
Bounded threading mix-in, based on the original SocketServer.ThreadingMixIn
|
||||
(from https://github.com/python/cpython/blob/master/Lib/socketserver.py).
|
||||
|
||||
Only difference between this and the original is that this will reject
|
||||
requests after a certain number of threads exist.
|
||||
"""
|
||||
|
||||
_threads = None
|
||||
_thread_guard = threading.Lock()
|
||||
_close = False
|
||||
|
||||
def process_request_thread(self, request, client_address):
|
||||
"""
|
||||
Same as in BaseServer but as a thread.
|
||||
In addition, exception handling is done here.
|
||||
"""
|
||||
global gc_thread
|
||||
|
||||
try:
|
||||
self.finish_request(request, client_address)
|
||||
except Exception:
|
||||
self.handle_error(request, client_address)
|
||||
finally:
|
||||
self.shutdown_request(request)
|
||||
|
||||
shutdown_thread = False
|
||||
with self._thread_guard:
|
||||
if threading.current_thread().ident in self._threads:
|
||||
del self._threads[threading.current_thread().ident]
|
||||
shutdown_thread = True
|
||||
|
||||
if BLOCKSTACK_TEST:
|
||||
log.debug('{} active threads (removed {})'.format(len(self._threads), threading.current_thread().ident))
|
||||
|
||||
if shutdown_thread:
|
||||
# count this towards our preemptive garbage collection
|
||||
gc_thread.gc_event()
|
||||
|
||||
|
||||
def process_request(self, request, client_address):
|
||||
"""
|
||||
Start a new thread to process the request.
|
||||
"""
|
||||
t = threading.Thread(target = self.process_request_thread,
|
||||
args = (request, client_address))
|
||||
|
||||
t.daemon = False
|
||||
|
||||
with self._thread_guard:
|
||||
if self._close:
|
||||
# server is done. do not make more threads
|
||||
self.shutdown_request(request)
|
||||
return
|
||||
|
||||
if self._threads is None:
|
||||
self._threads = {}
|
||||
|
||||
if len(self._threads) + 1 > MAX_RPC_THREADS:
|
||||
# overloaded
|
||||
log.warning("Too many outstanding requests ({})".format(len(self._threads)))
|
||||
self.shutdown_request(request)
|
||||
return
|
||||
|
||||
t.start()
|
||||
|
||||
self._threads[t.ident] = t
|
||||
|
||||
if BLOCKSTACK_TEST:
|
||||
log.debug('{} active threads (added {})'.format(len(self._threads), t.ident))
|
||||
|
||||
|
||||
def server_close(self):
|
||||
super().server_close()
|
||||
|
||||
with self._thread_guard:
|
||||
threads = self._threads
|
||||
self._threads = None
|
||||
self._close = True
|
||||
|
||||
if threads:
|
||||
for thread_id in threads.keys():
|
||||
threads[thread_id].join()
|
||||
|
||||
|
||||
class BlockstackdRPC(BoundedThreadingMixIn, SimpleXMLRPCServer):
|
||||
"""
|
||||
Blockstackd RPC server, used for querying
|
||||
the name database and the blockchain peer.
|
||||
@@ -445,7 +541,9 @@ class BlockstackdRPC(SimpleXMLRPCServer):
|
||||
# subdomain indexer handle
|
||||
self.subdomain_index = subdomain_index
|
||||
|
||||
self.rpc_guard = threading.Lock()
|
||||
|
||||
|
||||
def cache_flush(self):
|
||||
"""
|
||||
Clear all cached state
|
||||
@@ -1919,7 +2017,6 @@ class BlockstackdRPCServer( threading.Thread, object ):
|
||||
self.subdomain_index = subdomain_index
|
||||
self.rpc_server = BlockstackdRPC( self.working_dir, port=self.port, subdomain_index=self.subdomain_index )
|
||||
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Serve until asked to stop
|
||||
|
||||
@@ -193,6 +193,12 @@ if os.environ.get("BLOCKSTACK_TEST_MAX_RPC_LEN"):
|
||||
MAX_RPC_LEN = int(os.environ.get("BLOCKSTACK_TEST_MAX_RPC_LEN"))
|
||||
print("Overriding MAX_RPC_LEN to {}".format(MAX_RPC_LEN))
|
||||
|
||||
MAX_RPC_THREADS = 1024
|
||||
if os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'):
|
||||
MAX_RPC_THREADS = int(os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'))
|
||||
print('Overriding MAX_RPC_THREADS to {}'.format(MAX_RPC_THREADS))
|
||||
|
||||
|
||||
if BLOCKSTACK_TEST:
|
||||
RPC_MAX_INDEXING_DELAY = 5
|
||||
|
||||
|
||||
Reference in New Issue
Block a user