modify the BoundedThreadingMixIn so it will eagerly close client sockets when it has too many outstanding requests. Do some housekeeping so that we can use this in both the backplane RPC server and the API server, and update the relevant integration test to test for this behavior

This commit is contained in:
Jude Nelson
2018-09-18 17:15:31 -04:00
parent 917af62716
commit f0d589ab29
5 changed files with 214 additions and 121 deletions

View File

@@ -41,11 +41,168 @@ import keylib
import virtualchain
import virtualchain.lib.blockchain.bitcoin_blockchain as bitcoin_blockchain
from .config import RPC_SERVER_PORT, BLOCKSTACK_TEST, SUBDOMAIN_ADDRESS_VERSION_BYTE, SUBDOMAIN_ADDRESS_MULTISIG_VERSION_BYTE
from .config import RPC_SERVER_PORT, BLOCKSTACK_TEST, SUBDOMAIN_ADDRESS_VERSION_BYTE, SUBDOMAIN_ADDRESS_MULTISIG_VERSION_BYTE, MAX_RPC_THREADS, GC_EVENT_THRESHOLD
from .schemas import *
log = virtualchain.get_logger()
class GCThread( threading.Thread ):
"""
Optimistic GC thread
"""
def __init__(self, event_threshold=GC_EVENT_THRESHOLD):
threading.Thread.__init__(self)
self.running = True
self.event_count = 0
self.event_threshold = event_threshold
def run(self):
deadline = time.time() + 60
while self.running:
time.sleep(1.0)
if time.time() > deadline or self.event_count > self.event_threshold:
gc.collect()
deadline = time.time() + 60
self.event_count = 0
def signal_stop(self):
self.running = False
def gc_event(self):
self.event_count += 1
class BoundedThreadingMixIn(object):
"""
Bounded threading mix-in, based on the original SocketServer.ThreadingMixIn
(from https://github.com/python/cpython/blob/master/Lib/socketserver.py).
Only differences between this and the original are that:
* this will reject requests after a certain number of threads exist.
* this will reply with a "server is overloaded" message after a certain number of connections exist
"""
_threads = None
_thread_guard = threading.Lock()
_close = False
def process_request_thread(self, request, client_address):
"""
Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
from ..blockstackd import get_gc_thread
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
shutdown_thread = False
with self._thread_guard:
if threading.current_thread().ident in self._threads:
del self._threads[threading.current_thread().ident]
shutdown_thread = True
if BLOCKSTACK_TEST:
log.debug('{} active threads (removed {})'.format(len(self._threads), threading.current_thread().ident))
if shutdown_thread:
gc_thread = get_gc_thread()
if gc_thread:
# count this towards our preemptive garbage collection
gc_thread.gc_event()
def overloaded(self, request, client_addr):
# subclass must override
raise NotImplemented
def get_request(self):
"""
Accept a request, up to the given number of allowed threads.
Defer to self.overloaded if there are already too many pending requests.
"""
request, client_addr = super(BoundedThreadingMixIn, self).get_request()
overload = False
with self._thread_guard:
if self._threads is not None and len(self._threads) + 1 > MAX_RPC_THREADS:
overload = True
if overload:
res = self.overloaded(client_addr)
request.sendall(res)
sys.stderr.write('{} - - [{}] "Overloaded"\n'.format(client_addr[0], time_str(time.time())))
self.shutdown_request(request)
return None, None
return request, client_addr
def process_request(self, request, client_address):
"""
Start a new thread to process the request.
"""
if request is None or client_address is None:
# request was never initialized, i.e. due to overload
return
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = False
with self._thread_guard:
if self._close:
# server is done. do not make more threads
self.shutdown_request(request)
return
if self._threads is None:
self._threads = {}
if len(self._threads) + 1 > MAX_RPC_THREADS:
# overloaded
log.warning("Too many outstanding requests ({})".format(len(self._threads)))
self.shutdown_request(request)
return
t.start()
self._threads[t.ident] = t
if BLOCKSTACK_TEST:
log.debug('{} active threads (added {})'.format(len(self._threads), t.ident))
def server_close(self):
super(BoundedThreadingMixIn, self).server_close()
with self._thread_guard:
threads = self._threads
self._threads = None
self._close = True
if threads:
for thread_id in threads.keys():
threads[thread_id].join()
def time_str(ts):
year, month, day, hh, mm, ss, x, y, z = time.localtime(ts)
monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
s = "%02d/%3s/%04d %02d:%02d:%02d" % (day, monthname[month], year, hh, mm, ss)
return s
def url_to_host_port(url, port=None):
"""
Given a URL, turn it into (host, port) for a blockstack server.