modify the BoundedThreadingMixIn so it will eagerly close client sockets when it has too many outstanding requests. Do some housekeeping so that we can use this in both the backplane RPC server and the API server, and update the relevant integration test to test for this behavior

This commit is contained in:
Jude Nelson
2018-09-18 17:15:31 -04:00
parent 6fb37efb7a
commit 94bcf41cf6
5 changed files with 214 additions and 121 deletions

View File

@@ -42,6 +42,7 @@ import gc
import argparse
import jsonschema
from jsonschema import ValidationError
import BaseHTTPServer
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
@@ -58,7 +59,7 @@ from lib.client import BlockstackRPCClient
from lib.client import ping as blockstack_ping
from lib.client import OP_HEX_PATTERN, OP_CONSENSUS_HASH_PATTERN, OP_ADDRESS_PATTERN, OP_BASE64_EMPTY_PATTERN
from lib.config import REINDEX_FREQUENCY, BLOCKSTACK_TEST, default_bitcoind_opts, is_subdomains_enabled
from lib.util import url_to_host_port, atlas_inventory_to_string, daemonize, make_DID, parse_DID
from lib.util import url_to_host_port, atlas_inventory_to_string, daemonize, make_DID, parse_DID, BoundedThreadingMixIn, GCThread
from lib import *
from lib.storage import *
from lib.atlas import *
@@ -82,7 +83,6 @@ rpc_server = None
api_server = None
gc_thread = None
GC_EVENT_THRESHOLD = 15
def get_bitcoind(new_bitcoind_opts=None, reset=False, new=False):
"""
@@ -421,91 +421,6 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
return json.dumps(rpc_traceback())
class BoundedThreadingMixIn(object):
"""
Bounded threading mix-in, based on the original SocketServer.ThreadingMixIn
(from https://github.com/python/cpython/blob/master/Lib/socketserver.py).
Only difference between this and the original is that this will reject
requests after a certain number of threads exist.
"""
_threads = None
_thread_guard = threading.Lock()
_close = False
def process_request_thread(self, request, client_address):
"""
Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
global gc_thread
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
shutdown_thread = False
with self._thread_guard:
if threading.current_thread().ident in self._threads:
del self._threads[threading.current_thread().ident]
shutdown_thread = True
if BLOCKSTACK_TEST:
log.debug('{} active threads (removed {})'.format(len(self._threads), threading.current_thread().ident))
if shutdown_thread:
# count this towards our preemptive garbage collection
gc_thread.gc_event()
def process_request(self, request, client_address):
"""
Start a new thread to process the request.
"""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = False
with self._thread_guard:
if self._close:
# server is done. do not make more threads
self.shutdown_request(request)
return
if self._threads is None:
self._threads = {}
if len(self._threads) + 1 > MAX_RPC_THREADS:
# overloaded
log.warning("Too many outstanding requests ({})".format(len(self._threads)))
self.shutdown_request(request)
return
t.start()
self._threads[t.ident] = t
if BLOCKSTACK_TEST:
log.debug('{} active threads (added {})'.format(len(self._threads), t.ident))
def server_close(self):
super(BoundedThreadingMixIn, self).server_close()
with self._thread_guard:
threads = self._threads
self._threads = None
self._close = True
if threads:
for thread_id in threads.keys():
threads[thread_id].join()
class BlockstackdRPC(BoundedThreadingMixIn, SimpleXMLRPCServer):
"""
@@ -563,6 +478,25 @@ class BlockstackdRPC(BoundedThreadingMixIn, SimpleXMLRPCServer):
"""
return self.last_indexing_time + RPC_MAX_INDEXING_DELAY < time.time()
def overloaded(self, client_address):
"""
Got too many requests.
Send back a (precompiled) XMLRPC response saying as much
"""
body = {
'status': False,
'indexing': False,
'lastblock': -1,
'error': 'overloaded',
'http_status': 429
}
body_str = json.dumps(body)
resp = 'HTTP/1.0 200 OK\r\nServer: BaseHTTP/0.3 Python/2.7.14+\r\nContent-type: text/xml\r\nContent-length: {}\r\n\r\n'.format(len(body_str))
resp += '<methodResponse><params><param><value><string>{}</string></value></param></params></methodResponse>'.format(body_str)
return resp
def success_response(self, method_resp, **kw):
"""
@@ -1898,35 +1832,7 @@ class BlockstackdAPIServer( threading.Thread, object ):
log.warning("Failed to shut down API server socket")
self.api_server.shutdown()
class GCThread( threading.Thread ):
"""
Optimistic GC thread
"""
def __init__(self, event_threshold=GC_EVENT_THRESHOLD):
threading.Thread.__init__(self)
self.running = True
self.event_count = 0
self.event_threshold = event_threshold
def run(self):
deadline = time.time() + 60
while self.running:
time.sleep(1.0)
if time.time() > deadline or self.event_count > self.event_threshold:
gc.collect()
deadline = time.time() + 60
self.event_count = 0
def signal_stop(self):
self.running = False
def gc_event(self):
self.event_count += 1
def rpc_start( working_dir, port, subdomain_index=None, thread=True ):
"""
@@ -1998,6 +1904,14 @@ def gc_stop():
log.info("GC thread already joined")
def get_gc_thread():
"""
Get the global GC thread
"""
global gc_thread
return gc_thread
def api_start(working_dir, host, port, thread=True):
"""
Start the global API server
@@ -2743,6 +2657,8 @@ def run_blockstackd():
parser.add_argument(
'--port', action='store',
help='peer network port to bind on')
parser.add_argument(
'--api-port', action='store')
parser.add_argument(
'--api_port', action='store',
help='RESTful API port to bind on')
@@ -2753,7 +2669,9 @@ def run_blockstackd():
'--no-indexer', action='store_true',
help='Do not start the indexer component')
parser.add_argument(
'--indexer_url', action='store',
'--indexer_url', action='store'),
parser.add_argument(
'--indexer-url', action='store',
help='URL to the indexer-enabled blockstackd instance to use')
parser.add_argument(
'--no-api', action='store_true',

View File

@@ -170,10 +170,13 @@ if os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'):
MAX_RPC_THREADS = int(os.environ.get('BLOCKSTACK_RPC_MAX_THREADS'))
print('Overriding MAX_RPC_THREADS to {}'.format(MAX_RPC_THREADS))
if BLOCKSTACK_TEST:
RPC_MAX_INDEXING_DELAY = 5
# threshold for garbage-collection
GC_EVENT_THRESHOLD = 15
""" block indexing configs
"""
REINDEX_FREQUENCY = 300 # seconds

View File

@@ -38,6 +38,7 @@ import urlparse
from jsonschema import ValidationError
import signal
import json
import BaseHTTPServer
from decimal import Decimal
import client as blockstackd_client
@@ -46,6 +47,8 @@ import scripts as blockstackd_scripts
from scripts import check_name, check_namespace, check_subdomain, check_block, check_offset, \
check_count, check_string, check_address
from util import BoundedThreadingMixIn
import storage
from config import BLOCKSTACK_TEST, BLOCKSTACK_DEBUG, get_bitcoin_opts, get_blockstack_opts, get_blockstack_api_opts, LENGTHS, VERSION, RPC_MAX_ZONEFILE_LEN, FIRST_BLOCK_MAINNET
@@ -112,6 +115,7 @@ def get_unspents(address, bitcoind):
return format_unspents(unspents)
class BlockstackAPIEndpointHandler(SimpleHTTPRequestHandler):
'''
Blockstack RESTful API endpoint.
@@ -524,7 +528,7 @@ class BlockstackAPIEndpointHandler(SimpleHTTPRequestHandler):
page = qs_values.get('page', None)
if page is None:
return self._reply_json({'error': 'Missing page argument'}, status_code=400)
page = "0" # compatibility
try:
assert len(page) < 10
@@ -1453,7 +1457,7 @@ class BlockstackAPIEndpointHandler(SimpleHTTPRequestHandler):
return self._dispatch("PATCH")
class BlockstackAPIEndpoint(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
class BlockstackAPIEndpoint(BoundedThreadingMixIn, SocketServer.TCPServer):
"""
Lightweight API endpoint to Blockstack server:
exposes all of the client methods via a RESTful interface,
@@ -1496,3 +1500,12 @@ class BlockstackAPIEndpoint(SocketServer.ThreadingMixIn, SocketServer.TCPServer)
self.server_activate()
def overloaded(self, client_addr):
"""
Deflect if we have too many inbound requests
"""
overloaded_txt = 'HTTP/1.0 429 Too Many Requests\r\nServer: BaseHTTP/0.3 Python/2.7.14+\r\nContent-type: text/plain\r\nContent-length: 17\r\n\r\nToo many requests'
if BLOCKSTACK_TEST:
log.warn('Too many requests; deflecting {}'.format(client_addr))
return overloaded_txt

View File

@@ -40,11 +40,168 @@ import keylib
import virtualchain
import virtualchain.lib.blockchain.bitcoin_blockchain as bitcoin_blockchain
from .config import RPC_SERVER_PORT, BLOCKSTACK_TEST, SUBDOMAIN_ADDRESS_VERSION_BYTE, SUBDOMAIN_ADDRESS_MULTISIG_VERSION_BYTE
from .config import RPC_SERVER_PORT, BLOCKSTACK_TEST, SUBDOMAIN_ADDRESS_VERSION_BYTE, SUBDOMAIN_ADDRESS_MULTISIG_VERSION_BYTE, MAX_RPC_THREADS, GC_EVENT_THRESHOLD
from .schemas import *
log = virtualchain.get_logger()
class GCThread( threading.Thread ):
"""
Optimistic GC thread
"""
def __init__(self, event_threshold=GC_EVENT_THRESHOLD):
threading.Thread.__init__(self)
self.running = True
self.event_count = 0
self.event_threshold = event_threshold
def run(self):
deadline = time.time() + 60
while self.running:
time.sleep(1.0)
if time.time() > deadline or self.event_count > self.event_threshold:
gc.collect()
deadline = time.time() + 60
self.event_count = 0
def signal_stop(self):
self.running = False
def gc_event(self):
self.event_count += 1
class BoundedThreadingMixIn(object):
"""
Bounded threading mix-in, based on the original SocketServer.ThreadingMixIn
(from https://github.com/python/cpython/blob/master/Lib/socketserver.py).
Only differences between this and the original are that:
* this will reject requests after a certain number of threads exist.
* this will reply with a "server is overloaded" message after a certain number of connections exist
"""
_threads = None
_thread_guard = threading.Lock()
_close = False
def process_request_thread(self, request, client_address):
"""
Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
from ..blockstackd import get_gc_thread
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
shutdown_thread = False
with self._thread_guard:
if threading.current_thread().ident in self._threads:
del self._threads[threading.current_thread().ident]
shutdown_thread = True
if BLOCKSTACK_TEST:
log.debug('{} active threads (removed {})'.format(len(self._threads), threading.current_thread().ident))
if shutdown_thread:
gc_thread = get_gc_thread()
if gc_thread:
# count this towards our preemptive garbage collection
gc_thread.gc_event()
def overloaded(self, request, client_addr):
# subclass must override
raise NotImplemented
def get_request(self):
"""
Accept a request, up to the given number of allowed threads.
Defer to self.overloaded if there are already too many pending requests.
"""
request, client_addr = super(BoundedThreadingMixIn, self).get_request()
overload = False
with self._thread_guard:
if self._threads is not None and len(self._threads) + 1 > MAX_RPC_THREADS:
overload = True
if overload:
res = self.overloaded(client_addr)
request.sendall(res)
sys.stderr.write('{} - - [{}] "Overloaded"\n'.format(client_addr[0], time_str(time.time())))
self.shutdown_request(request)
return None, None
return request, client_addr
def process_request(self, request, client_address):
"""
Start a new thread to process the request.
"""
if request is None or client_address is None:
# request was never initialized, i.e. due to overload
return
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = False
with self._thread_guard:
if self._close:
# server is done. do not make more threads
self.shutdown_request(request)
return
if self._threads is None:
self._threads = {}
if len(self._threads) + 1 > MAX_RPC_THREADS:
# overloaded
log.warning("Too many outstanding requests ({})".format(len(self._threads)))
self.shutdown_request(request)
return
t.start()
self._threads[t.ident] = t
if BLOCKSTACK_TEST:
log.debug('{} active threads (added {})'.format(len(self._threads), t.ident))
def server_close(self):
super(BoundedThreadingMixIn, self).server_close()
with self._thread_guard:
threads = self._threads
self._threads = None
self._close = True
if threads:
for thread_id in threads.keys():
threads[thread_id].join()
def time_str(ts):
year, month, day, hh, mm, ss, x, y, z = time.localtime(ts)
monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
s = "%02d/%3s/%04d %02d:%02d:%02d" % (day, monthname[month], year, hh, mm, ss)
return s
def url_to_host_port(url, port=None):
"""
Given a URL, turn it into (host, port) for a blockstack server.

View File

@@ -41,6 +41,7 @@ wallets = [
consensus = "17ac43c1d8549c3181b200f1bf97eb7d"
def slow_client(timeout):
# NOTE: these will 400, but that's okay -- the point is to fill up the number of concurrent client slots
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect(('localhost', 16264))
@@ -80,6 +81,7 @@ def scenario( wallets, **kw ):
# should fail---no threads are free
res = rpcclient.getinfo()
assert 'error' in res, 'Expected error, got {}'.format(res)
assert res['error'] == 'overloaded', res
except:
pass