add a bounded threading mixin to the RPC server that allows it to have up to a maximum number of outstanding threads handling requests. serialize all RPC access so we don't introduce database contention or corruption. make the maximum number of RPC threads overridable in the environment.

This commit is contained in:
Jude Nelson
2018-07-05 17:24:30 -04:00
parent ddbfa1cd1b
commit 8358c1d783
2 changed files with 106 additions and 3 deletions

View File

@@ -45,6 +45,7 @@ from jsonschema import ValidationError
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import SocketServer
# stop common XML attacks
from defusedxml import xmlrpc
@@ -388,7 +389,16 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
else:
log.debug("RPC %s(%s) begin from %s" % ("rpc_" + str(method), params_fmt, self.client_address[0]))
res = self.server.funcs["rpc_" + str(method)](*params, **con_info)
res = None
with self.server.rpc_guard:
# RPC calls should be sequential to ensure database integrity
if BLOCKSTACK_TEST:
log.debug('RPC thread enter {}'.format(threading.current_thread().ident))
res = self.server.funcs["rpc_" + str(method)](*params, **con_info)
if BLOCKSTACK_TEST:
log.debug('RPC thread exit {}'.format(threading.current_thread().ident))
if 'deprecated' in res and res['deprecated']:
log.warn("DEPRECATED method call {} from {}".format(method, self.client_address[0]))
@@ -407,7 +417,93 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
return json.dumps(rpc_traceback())
class BlockstackdRPC(SimpleXMLRPCServer):
class BoundedThreadingMixIn:
"""
Bounded threading mix-in, based on the original SocketServer.ThreadingMixIn
(from https://github.com/python/cpython/blob/master/Lib/socketserver.py).
Only difference between this and the original is that this will reject
requests after a certain number of threads exist.
"""
_threads = None
_thread_guard = threading.Lock()
_close = False
def process_request_thread(self, request, client_address):
"""
Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
global gc_thread
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
shutdown_thread = False
with self._thread_guard:
if threading.current_thread().ident in self._threads:
del self._threads[threading.current_thread().ident]
shutdown_thread = True
if BLOCKSTACK_TEST:
log.debug('{} active threads (removed {})'.format(len(self._threads), threading.current_thread().ident))
if shutdown_thread:
# count this towards our preemptive garbage collection
gc_thread.gc_event()
def process_request(self, request, client_address):
"""
Start a new thread to process the request.
"""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = False
with self._thread_guard:
if self._close:
# server is done. do not make more threads
self.shutdown_request(request)
return
if self._threads is None:
self._threads = {}
if len(self._threads) + 1 > MAX_RPC_THREADS:
# overloaded
log.warning("Too many outstanding requests ({})".format(len(self._threads)))
self.shutdown_request(request)
return
t.start()
self._threads[t.ident] = t
if BLOCKSTACK_TEST:
log.debug('{} active threads (added {})'.format(len(self._threads), t.ident))
def server_close(self):
super().server_close()
with self._thread_guard:
threads = self._threads
self._threads = None
self._close = True
if threads:
for thread_id in threads.keys():
threads[thread_id].join()
class BlockstackdRPC(BoundedThreadingMixIn, SimpleXMLRPCServer):
"""
Blockstackd RPC server, used for querying
the name database and the blockchain peer.
@@ -439,7 +535,9 @@ class BlockstackdRPC(SimpleXMLRPCServer):
# subdomain indexer handle
self.subdomain_index = subdomain_index
self.rpc_guard = threading.Lock()
def cache_flush(self):
"""
Clear all cached state
@@ -1726,7 +1824,6 @@ class BlockstackdRPCServer( threading.Thread, object ):
self.subdomain_index = subdomain_index
self.rpc_server = BlockstackdRPC( self.working_dir, port=self.port, subdomain_index=self.subdomain_index )
def run(self):
"""
Serve until asked to stop

View File

@@ -164,6 +164,12 @@ if os.environ.get("BLOCKSTACK_TEST_MAX_RPC_LEN"):
MAX_RPC_LEN = int(os.environ.get("BLOCKSTACK_TEST_MAX_RPC_LEN"))
print("Overriding MAX_RPC_LEN to {}".format(MAX_RPC_LEN))
MAX_RPC_THREADS = 1024
if os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'):
MAX_RPC_THREADS = int(os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'))
print('Overriding MAX_RPC_THREADS to {}'.format(MAX_RPC_THREADS))
if BLOCKSTACK_TEST:
RPC_MAX_INDEXING_DELAY = 5