Files
stacks-puppet-node/integration_tests/bin/blockstack-test-scenario
2018-02-15 23:26:50 -05:00

1453 lines
48 KiB
Python
Executable File

#!/usr/bin/env python
import os
# enable all tests
os.environ['BLOCKSTACK_DEBUG'] = '1'
os.environ['BLOCKSTACK_TEST'] = '1'
os.environ['BLOCKSTACK_CLIENT_TEST_ALTERNATIVE_CONFIG'] = '1'
os.environ['BLOCKSTACK_TESTNET'] = '1'
os.environ['BLOCKSTACK_ATLAS_NETWORK_SIMULATION'] = '1'
TEST_FIRST_BLOCK_HEIGHT = 432 + 250 # how many blocks we have to generate to start regtest
os.environ['BLOCKSTACK_TEST_FIRST_BLOCK'] = str(TEST_FIRST_BLOCK_HEIGHT + 6)
import sys
import subprocess
import signal
import shutil
import time
import atexit
import errno
import importlib
import traceback
import socket
import simplejson
import threading
import argparse
import BaseHTTPServer
import cgi
import random
from datetime import datetime
from influxdb import InfluxDBClient
blockstack = None
blockstackd = None
testlib = None
g_interactive = False
import virtualchain
log = virtualchain.get_logger("blockstack-test-scenario")
from virtualchain.lib.blockchain.bitcoin_blockchain import JSONRPCException
MAX_TEST_LIFETIME = 75 * 60 * 60 # max test run is 75 minutes
TEST_RPC_PORT = 16264
TEST_CLIENT_RPC_PORT = int(os.environ.get("BLOCKSTACK_TEST_CLIENT_RPC_PORT", 16268))
TEST_CLIENT_BIND = os.environ.get("BLOCKSTACK_TEST_CLIENT_BIND", "localhost")
BITCOIND_RPC_ALLOW_IP = os.environ.get("BLOCKSTACK_TEST_BITCOIND_ALLOWIP", False)
BLOCKSTACK_STORAGE_DRIVERS = "disk"
if os.environ.get("BLOCKSTACK_STORAGE_DRIVERS", None) is not None:
BLOCKSTACK_STORAGE_DRIVERS = os.environ.get("BLOCKSTACK_STORAGE_DRIVERS")
BITCOIN_DIR = None # set at runtime
SCENARIO_PID = os.getpid()
NET_LOG_PATH = "/tmp/blockstack-test-netlog.log"
DEFAULT_SERVER_INI_TEMPLATE = """
[bitcoind]
passwd = blockstacksystem
server = localhost
port = 18332
p2p_port = 18444
use_https = False
user = blockstack
regtest = True
spv_path = @CLIENT_BLOCKCHAIN_HEADERS@
[blockstack]
server_version = 0.18.0
rpc_port = %s
backup_frequency = 1
backup_max_age = 30
serve_zonefiles = True
zonefiles = @ZONEFILES@
atlas = True
atlas_seeds =
atlas_blacklist =
atlas_hostname = localhost
""" % TEST_RPC_PORT
DEFAULT_CLIENT_INI_TEMPLATE = """
[blockstack-client]
client_version = 0.17.0.9
server = localhost
port = %s
protocol = http
metadata = @CLIENT_METADATA@
storage_drivers = @CLIENT_STORAGE_DRIVERS@
storage_drivers_required_write = @CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE@
advanced_mode = true
api_endpoint_port = %s
api_password = blockstack_integration_test_api_password
poll_interval = 1
queue_path = @CLIENT_QUEUE_PATH@
rpc_detach = True
blockchain_reader = bitcoind_utxo
blockchain_writer = bitcoind_utxo
anonymous_statistics = False
api_endpoint_bind = %s
api_endpoint_host = %s
[blockchain-reader]
utxo_provider = bitcoind_utxo
rpc_username = blockstack
rpc_password = blockstacksystem
server = localhost
port = 18332
use_https = False
[blockchain-writer]
utxo_provider = bitcoind_utxo
rpc_username = blockstack
rpc_password = blockstacksystem
server = localhost
port = 18332
use_https = False
[subdomain-resolution]
resolving_subdomains = True
subdomains_db = @CLIENT_SUBDOMAINS_DB@
[bitcoind]
passwd = blockstacksystem
server = localhost
port = 18332
use_https = False
user = blockstack
regtest = True
spv_path = @CLIENT_BLOCKCHAIN_HEADERS@
[subdomain-resolution]
resolving_subdomains = True
""" % (TEST_RPC_PORT, TEST_CLIENT_RPC_PORT, TEST_CLIENT_BIND, TEST_CLIENT_BIND)
class Pinger(threading.Thread):
"""
For some reason, bitcoind -regtest
won't reply blocks on its p2p interface
unless we continuously ping it (maybe its
bufferring up block data?). This is
a stop-gap solution until we know more.
"""
def __init__(self, working_dir):
threading.Thread.__init__(self)
self.running = False
self.working_dir = working_dir
def run(self):
self.running = True
bitcoind = bitcoin_regtest_connect( bitcoin_regtest_opts(self.working_dir) )
while self.running:
try:
bitcoind.ping()
time.sleep(0.25)
except socket.error:
bitcoind = bitcoin_regtest_connect( bitcoin_regtest_opts(self.working_dir) )
def ask_join(self):
self.running = False
class TestTimeout(threading.Thread):
"""
In the event that a test deadlocks, force it to abort after a set amount of time
"""
def __init__(self, timeout):
threading.Thread.__init__(self)
self.timeout = timeout
self.running = False
def run(self):
self.running = True
deadline = time.time() + self.timeout
while self.running:
time.sleep(1)
if time.time() > deadline:
# out of time
log.error("Out of time")
sys.stdout.flush()
sys.stderr.flush()
os.abort()
def ask_join(self):
self.running = False
class WebTestServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
programmatic directives to generate new blocks
and exit
"""
def do_GET(self):
# UI
bitcoind = bitcoin_regtest_connect( bitcoin_regtest_opts(self.server.working_dir) )
blockheight = bitcoind.getblockcount()
conf = blockstack_client.get_config()
panel = '<html><head></head><body>Integration test control panel<br>'
panel += '<br>'
panel += 'Blockchain height: {}<br>'.format(blockheight)
panel += 'API password: {}<br>'.format(conf['api_password'])
panel += '<br>'
panel += '<form action="/nextblock" method="post">'
panel += ' Number of blocks: <input type="text" name="numblocks" value="1"> <input type="submit" value="Generate blocks"></form><br>'
panel += '<form action="/sendfunds" method="post">'
panel += ' Fund address: <input type="text" name="addr"> value (satoshis): <input type="text" name="value" default="0"> <input type="submit" value="Fund address"></form><br>'
panel += '<form action="/done" method="post">'
panel += ' <input type="submit" value="Done testing"></form>'
panel += '</body></html>'
self.send_response(200)
self.send_header('content-type', 'text/html')
self.send_header('content-length', len(panel))
self.end_headers()
self.wfile.write(panel)
return
def do_POST(self):
content_type = self.headers.getheader('content-type')
postvars = {}
if content_type is not None:
ctype, pdict = cgi.parse_header(content_type)
if ctype == 'multipart/form-data':
postvars = cgi.parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers.getheader('content-length'))
postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1)
if self.path == '/nextblock':
# how many blocks?
numblocks = postvars.get('numblocks', ['1'])
try:
numblocks = int(numblocks[0])
except:
log.error("Invalid numblocks '{}'".format(numblocks))
log.error("postvars = {}".format(postvars))
self.send_response(401, "Invalid number of blocks")
self.end_headers()
return
for i in xrange(0, numblocks):
self.server.next_block()
self.send_response(302)
self.send_header('location', '/')
self.end_headers()
return
elif self.path == '/sendfunds':
# fund an address
addr = postvars.get('addr', [None])
value = postvars.get('value', [None])
if addr[0] is None or value[0] is None:
log.error("Missing addr or value")
self.send_response(401, "Invalid request: missing addr or value")
self.end_headers()
return
try:
value = int(value[0])
addr = virtualchain.address_reencode(addr[0])
except:
log.error("Failed to read addr and/or value")
log.error("postvars = {}".format(postvars))
self.send_response(401, "Invalid addr or value")
self.end_headers()
return
# send funds
res = testlib.send_funds(testlib.get_default_payment_wallet().privkey, value, addr)
if 'error' in res:
log.error("Failed to send {} from {} to {}: {}".format(
value, testlib.get_default_payment_wallet().privkey, addr, res
))
self.send_response(401, "Failed to send value")
self.end_headers()
return
# confirm it
for i in xrange(0, 6):
self.server.next_block()
self.send_response(302)
self.send_header('location', '/')
self.end_headers()
return
elif self.path == '/done':
self.server.done = True
self.send_response(200)
self.end_headers()
return
else:
log.error("Unsupported path {}".format(self.path))
self.send_response(401, "Only support /nextblock and /done at this time")
self.end_headers()
return
class WebTestServer(BaseHTTPServer.HTTPServer):
def __init__(self, working_dir, port, test_env):
BaseHTTPServer.HTTPServer.__init__(self, ('localhost', port), WebTestServerRequestHandler)
self.test_env = test_env
self.done = False
self.working_dir = working_dir
def next_block(self):
testlib.next_block(**self.test_env)
def parse_test_pragma( line ):
"""
Parse a test pragma line from a scenario
"""
ret = {}
lineparts = line.split(" ")
assert len(lineparts) >= 2
if lineparts[0] != 'TEST':
raise Exception("Not a test pragma: '%s'" % line)
if lineparts[1] == 'ENV':
assert len(lineparts) > 3
# environment variable
ret['type'] = 'ENV'
ret['name'] = lineparts[2]
ret['value'] = ' '.join(lineparts[3:])
else:
# unknown
raise Exception("Unknown test pragma '%s'" % lineparts[0])
return ret
def get_blockstack_db(working_dir, disposition=None):
"""
Get database state, read-only or read-write
Returns the handle on success
Raises on error
"""
impl = blockstack.lib.virtualchain_hooks
db_inst = None
if disposition == blockstackd.DISPOSITION_RO:
db_inst = BlockstackDB.get_readonly_instance(working_dir)
elif disposition == blockstackd.DISPOSITION_RW:
db_inst = BlockstackDB.get_readwrite_instance(working_dir)
else:
raise ValueError("Invalid disposition")
assert db_inst, 'Failed to instantiate database handle'
return db_inst
def load_scenario( scenario_name ):
"""
Load up a scenario, and validate it.
A scenario is a python file with:
* a global variable 'wallet' that is a dict
which maps private keys to their initial values.
* a global variable 'consensus' that represents
the initial consensus hash.
* a callable called 'scenario' that takes the
wallet as an argument and runs the test.
* a callable called 'check' that takes the state
engine as an argument and checks it for correctness.
"""
log.debug("Load scenario %s" % scenario_name)
# strip .py from scenario name
if scenario_name.endswith(".py"):
scenario_name = scenario_name[:-3]
# identify the file's path
log.debug("Identifying scenario path")
p = subprocess.Popen( "python -c 'import %s; print %s.__file__'" % (scenario_name, scenario_name), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
path_buf, err = p.communicate()
if p.returncode is not None and p.returncode != 0:
print >> sys.stderr, err
raise Exception("Failed to load %s, exit code %s" % (scenario_name, p.returncode))
path = path_buf.strip().split("\n")[-1].strip()
if path.endswith(".pyc"):
path = path[:-4] + ".py"
log.debug("Path of %s is %s" % (scenario_name, path))
# load any test pragmas (like environment variables) and set them now
with open(path, "r") as f:
scenario_data = f.readlines()
for line in scenario_data:
line = line.strip()
try:
test_pragma = parse_test_pragma( line )
except:
continue
if test_pragma['type'] == 'ENV':
os.environ[ test_pragma['name'] ] = test_pragma['value']
log.debug("export %s='%s'" % (test_pragma['name'], os.environ[test_pragma['name']]))
try:
scenario = importlib.import_module( scenario_name )
except ImportError, ie:
log.exception(ie)
raise Exception("Failed to import '%s'." % scenario_name )
# validate
if not hasattr( scenario, "wallets" ):
# default empty wallet
log.warning("Empty wallet for scenario '%s'" % scenario_name )
scenario.wallets = {}
if not hasattr( scenario, "consensus" ):
# default consensus hash
log.warning("No consensus hash for '%s'" % scenario_name )
scenario.consensus = "00" * 16
if not hasattr( scenario, "scenario" ):
# not a valid test
log.error("Invalid scenario '%s': no 'scenario' method" % scenario_name )
return None
if not hasattr( scenario, "check" ):
# not a valid test
log.error("Invalid scenario '%s': no 'check' method" % scenario_name )
return None
return scenario
def generate_config_file( working_dir, scenario, path, template, extra_fields):
"""
Generate the config file to use with this test scenario.
Write it to path.
"""
client_metadata = extra_fields.get("CLIENT_METADATA", "")
queue_path = extra_fields.get("CLIENT_QUEUE_PATH", "")
accounts_dir = extra_fields.get("CLIENT_ACCOUNTS_PATH", "")
users_dir = extra_fields.get("CLIENT_USERS_PATH", "")
datastores_path = extra_fields.get("CLIENT_DATASTORES_PATH", "")
zonefiles_dir = extra_fields.get("ZONEFILES", None)
storage_drivers = extra_fields.get("CLIENT_STORAGE_DRIVERS", BLOCKSTACK_STORAGE_DRIVERS)
# storage_drivers_required_write = "disk" #extra_fields.get("CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE", BLOCKSTACK_STORAGE_DRIVERS)
storage_drivers_required_write = extra_fields.get("CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE", BLOCKSTACK_STORAGE_DRIVERS)
serve_data = extra_fields.get("SERVE_DATA", "True")
redirect_data = extra_fields.get("REDIRECT_DATA", "False")
data_servers = extra_fields.get("DATA_SERVERS", "True")
subdomains_db = extra_fields.get("SUBDOMAINS_DB", os.path.join(os.path.dirname(path), 'subdomains.db'))
if zonefiles_dir is None:
zonefiles_dir = "/tmp/blockstack-test-zonefiles"
config_txt = None
if template is not None:
config_txt = template[:]
else:
raise Exception("No config template")
spv_headers_path = os.path.join( working_dir, "spv_headers.dat")
config_txt = config_txt.replace( "@CLIENT_BLOCKCHAIN_HEADERS@", spv_headers_path )
config_txt = config_txt.replace( "@CLIENT_METADATA@", client_metadata )
config_txt = config_txt.replace( "@CLIENT_ACCOUNTS_PATH@", accounts_dir )
config_txt = config_txt.replace( "@CLIENT_USERS_PATH@", users_dir )
config_txt = config_txt.replace( "@CLIENT_DATASTORES_PATH@", datastores_path )
config_txt = config_txt.replace( "@CLIENT_QUEUE_PATH@", queue_path )
config_txt = config_txt.replace( "@ZONEFILES@", zonefiles_dir )
config_txt = config_txt.replace( "@CLIENT_STORAGE_DRIVERS@", storage_drivers )
config_txt = config_txt.replace( "@CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE@", storage_drivers_required_write )
config_txt = config_txt.replace( "@SERVE_DATA@", serve_data )
config_txt = config_txt.replace( "@REDIRECT_DATA@", redirect_data )
config_txt = config_txt.replace( "@DATA_SERVERS@", data_servers )
config_txt = config_txt.replace( "@CLIENT_SUBDOMAINS_DB@", subdomains_db)
with open( path, "w" ) as f:
f.write( config_txt )
f.flush()
return 0
def bitcoin_stop():
"""
Stop bitcoin
"""
global BITCOIN_DIR
assert BITCOIN_DIR
bitcoin_conf = os.path.join(BITCOIN_DIR, "bitcoin.conf")
rc = os.system("bitcoin-cli -regtest -conf=%s stop" % bitcoin_conf)
assert rc == 0, "Failed to stop bitcoind"
def atexit_cleanup( spv_headers_path, client_config_path, stop_bitcoind ):
"""
Clean up a scenario
"""
global BITCOIN_DIR, SCENARIO_PID
assert BITCOIN_DIR
if os.getpid() != SCENARIO_PID:
# don't let the API daemon call this by mistake
return
log.debug("atexit_cleanup")
try:
os.unlink(spv_headers_path)
except:
pass
if client_config_path is not None:
client_config_dir = os.path.dirname(client_config_path)
blockstack_client.rpc.local_api_stop(client_config_dir)
if stop_bitcoind:
bitcoin_conf = os.path.join(BITCOIN_DIR, "bitcoin.conf")
os.system("bitcoin-cli -regtest -conf=%s stop" % bitcoin_conf)
def sync_virtualchain_upcall(server_state):
"""
Upcall from the test scenario
to synchronize virtualchain.
Part of advancing to the next block.
"""
working_dir = server_state['working_dir']
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RW)
testlib.set_state_engine(db)
rc = blockstackd.index_blockchain(server_state)
if not rc:
log.error("Failed to index blockchain")
sys.exit(1)
db.close()
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RW)
testlib.set_state_engine(db)
def run_scenario( virtualchain_working_dir, scenario, config_file, client_config_file, interactive=False, blocktime=10, webtest_port=None ):
"""
Run a test scenario:
* set up the virtualchain to use our mock UTXO provider and mock bitcoin blockchain
* seed it with the initial values in the wallet
* set the initial consensus hash
* start the API server
* run the scenario method
* run the check method
"""
spv_headers_path = os.path.join( virtualchain_working_dir, "spv_headers.dat")
atexit_cleanup( spv_headers_path, None, False )
atexit.register( atexit_cleanup, spv_headers_path, client_config_file, True )
client_config_dir = os.path.dirname(client_config_file)
# set up blockstack
server_opts = blockstack.lib.config.configure(virtualchain_working_dir, config_file=config_file, interactive=False)
blockstack_opts = server_opts['blockstack']
bitcoin_opts = server_opts['bitcoind']
client_opts = blockstack_client.config.configure(config_file=client_config_file, interactive=False)
utxo_opts = client_opts['blockchain-reader']
# pass along extra arguments
utxo_opts['blockchain_server'] = 'localhost'
utxo_opts['blockchain_port'] = 18332
utxo_opts['spv_headers_path'] = spv_headers_path
blockstack_opts['working_dir'] = virtualchain_working_dir
print ""
print "blockstack opts"
print json.dumps( blockstack_opts, indent=4 )
print ""
print "blockchain opts"
print json.dumps( bitcoin_opts, indent=4 )
print ""
print "blockchain service opts"
print json.dumps( utxo_opts, indent=4 )
print "blockchain headers: %s" % spv_headers_path
blockstackd.set_blockstack_opts( blockstack_opts )
blockstackd.set_bitcoin_opts( bitcoin_opts )
print ""
print "bitcoind connection"
print ""
bitcoind = bitcoin_regtest_connect( bitcoin_regtest_opts(virtualchain_working_dir) )
utxo_provider = blockstack_client.backend.utxo.bitcoind_utxo.BitcoindClient("blockstack", "blockstacksystem", port=18332, version_byte=virtualchain.version_byte )
working_dir = virtualchain_working_dir
print "working_dir: %s" % working_dir
# set up test environment
db = get_blockstack_db(virtualchain_working_dir, disposition=blockstackd.DISPOSITION_RW)
testlib.set_utxo_opts( utxo_opts )
testlib.set_bitcoind( bitcoind )
testlib.set_state_engine( db )
# Start server
server_state = blockstackd.server_setup(working_dir)
testlib.set_server_state(server_state)
# start pinging bitcoind so it pushes out p2p messages
pinger = Pinger(virtualchain_working_dir)
pinger.start()
# forced timoeut
timeout_watchdog = TestTimeout(MAX_TEST_LIFETIME)
timeout_watchdog.start()
# shutdown procedure
def shutdown_procedure(*args, **kw):
if os.getpid() != SCENARIO_PID:
# don't let the API daemon call this
log.info("API daemon exiting")
return
log.info("Shutting down bitcoin")
bitcoin_stop()
log.info("Shutting down API endpoint")
blockstack_client.rpc.local_api_stop(client_config_dir)
log.info("Shutting down Blockstackd")
blockstackd.server_shutdown(server_state)
if pinger:
log.info("Shutting down pinger")
pinger.ask_join()
pinger.join()
if timeout_watchdog:
log.info("Shutting down timeout watchdog")
timeout_watchdog.ask_join()
timeout_watchdog.join()
def signal_shutdown_procedure(*args, **kw):
shutdown_procedure()
sys.exit(1)
def signal_reap_children(*args, **kw):
"""
If we're PID 1, then adopt orphan children
"""
while True:
try:
os.waitpid(-1, os.WNOHANG)
time.sleep(0.25)
except OSError, oe:
if oe.errno == errno.ECHILD:
break
else:
raise
def running_in_docker():
try:
with open('/proc/1/cpuset') as cpuset:
content = [l for l in cpuset][0]
return content.startswith('/docker')
except:
return False
# override blockstackd signal handlers with ones for the test framework
signal.signal(signal.SIGINT, signal_shutdown_procedure)
signal.signal(signal.SIGQUIT, signal_shutdown_procedure)
signal.signal(signal.SIGTERM, signal_shutdown_procedure)
if SCENARIO_PID == 1 and not running_in_docker():
signal.signal(signal.SIGCHLD, signal_reap_children)
# make sure we're connected
try:
rpcclient = testlib.TestAPIProxy()
res = rpcclient.getinfo()
except Exception as e:
log.exception(e)
sys.exit(1)
test_env = {
"sync_virtualchain_upcall": lambda: sync_virtualchain_upcall(server_state),
"next_block_upcall": lambda: bitcoin_regtest_next_block(working_dir),
"working_dir": working_dir,
"bitcoind": bitcoind,
"bitcoin_opts": bitcoin_opts,
"spv_headers_path": spv_headers_path,
"blockstack_opts": blockstack_opts
}
'''
# we're running
blockstackd.set_running(True)
'''
# sync initial utxos
testlib.next_block( **test_env )
# load the scenario into the mock blockchain and mock utxo provider
try:
rc = scenario.scenario( scenario.wallets, **test_env )
except Exception, e:
log.exception(e)
traceback.print_exc()
log.error("Failed to run scenario '%s'" % scenario.__name__)
shutdown_procedure()
return False
if rc == False:
# explicitly erred
log.error("Scenario exits in error")
log.error("Failed to run tests '%s'" % scenario.__name__)
shutdown_procedure()
return False
log.info("\n\nTest finished; doing checks\n\n")
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RW)
testlib.set_state_engine(db)
# run the checks on the database
try:
rc = scenario.check( db )
except Exception, e:
log.exception(e)
traceback.print_exc()
log.error("Failed to run tests '%s'" % scenario.__name__)
shutdown_procedure()
return False
if not rc:
shutdown_procedure()
return rc
# do any more interactive tests
if interactive:
if webtest_port is None:
log.info("Keeping test server online for testing purposes.")
log.info("Blocktime is %s second(s)" % blocktime)
while True:
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RW)
testlib.set_state_engine( db )
try:
time.sleep(blocktime)
except:
break
testlib.next_block( **test_env )
else:
log.info("")
log.info("Keeping Web test server online for testing purposes.")
log.info("")
log.info(" Web test server online at http://localhost:{}".format(webtest_port))
log.info("")
log.info(" Create new blocks by sending a POST request to http://localhost:{}/nextblock".format(webtest_port))
log.info(" Example:")
log.info(" $ curl -X POST http://localhost:{}/nextblock".format(webtest_port))
log.info("")
log.info(" Exit the test scenario by sending a POST request to http://localhost:{}/done".format(webtest_port))
log.info(" Example:")
log.info(" $ curl -X POST http://localhost:{}/done".format(webtest_port))
log.info("")
webtest_server = WebTestServer(virtualchain_working_dir, webtest_port, test_env)
while not webtest_server.done:
webtest_server.handle_request()
pinger.ask_join()
pinger.join()
pinger = None
blockstackd.server_atlas_shutdown(server_state)
if interactive:
shutdown_procedure()
return True
log.info("\n\nScenario checks passed; verifying history\n\n")
# run database integrity check at each block
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RO)
rc = False
try:
rc = testlib.check_history( db )
assert rc, "History check failed"
except Exception, e:
log.exception(e)
shutdown_procedure()
return rc
log.info("History check passes!")
# run snv at each name
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RO)
rc = False
try:
rc = testlib.snv_all_names( db )
assert rc, "SNV check failed"
except Exception, e:
log.exception(e)
shutdown_procedure()
return rc
log.info("SNV check passes!")
# verify atlas zonefiles
db = get_blockstack_db(working_dir, disposition=blockstackd.DISPOSITION_RO)
rc = testlib.check_atlas_zonefiles( db, blockstack_opts['atlasdb_path'] )
if not rc:
shutdown_procedure()
return rc
log.info("Atlas zonefile checks pass!")
# verify historic name ownership
rc = testlib.check_historic_names_by_address(db)
if not rc:
shutdown_procedure()
return rc
log.info("Historic name test passes!")
shutdown_procedure()
return True
def bitcoin_regtest_opts(working_dir):
"""
Get connection options for bitcoind in regtest mode
"""
return {
"bitcoind_server": "localhost",
"bitcoind_port": 18332,
"bitcoind_p2p_port": 18444,
"bitcoind_user": "blockstack",
"bitcoind_passwd": "blockstacksystem",
"bitcoind_use_https": False,
"bitcoind_timeout": 60,
"bitcoind_spv_path": os.path.join(working_dir, "spv_headers.dat")
}
def bitcoin_regtest_reset(working_dir):
"""
Reset bitcoind regtest to a clean state
"""
global BITCOIN_DIR
assert BITCOIN_DIR
bitcoin_dir = BITCOIN_DIR[:]
bitcoin_pidpath = os.path.join(bitcoin_dir, "bitcoind.pid")
bitcoin_conf = os.path.join(bitcoin_dir, "bitcoin.conf")
opts = bitcoin_regtest_opts(working_dir)
if os.path.exists(bitcoin_dir):
if os.path.exists(bitcoin_pidpath):
# kill running daemon
os.system("bitcoin-cli -regtest -conf=%s stop" % bitcoin_conf)
while True:
time.sleep(1.0)
rc = os.system("bitcoin-cli -regtest conf=%s stop" % bitcoin_conf)
if rc != 0:
break
# start fresh
shutil.rmtree(bitcoin_dir)
os.makedirs(bitcoin_dir)
with open(bitcoin_conf, "w") as f:
f.write("rpcuser=%s\nrpcpassword=%s\nregtest=1\ntxindex=1\nlisten=1\nserver=1\ndatadir=%s\ndebug=1\nrpcserialversion=0" % (opts['bitcoind_user'], opts['bitcoind_passwd'], bitcoin_dir))
if BITCOIND_RPC_ALLOW_IP:
f.write("\nrpcallowip=%s\n" % BITCOIND_RPC_ALLOW_IP)
f.flush()
os.fsync(f.fileno())
# start up
log.debug("Starting up bitcoind in regtest mode")
rc = os.system("bitcoind -daemon -rpcport=%s -conf=%s" % (18332, bitcoin_conf))
if rc != 0:
log.error("Failed to start `bitcoind`: rc = %s" % rc)
return False
# wait for a few seconds for it to come up
log.debug("Waiting for bitcoind to start up")
deadline = time.time() + 180
started = False
while time.time() < deadline:
time.sleep(1)
opts = bitcoin_regtest_opts(working_dir)
try:
bitcoind = virtualchain.default_connect_bitcoind( opts )
bitcoind.getinfo()
started = True
break
except socket.error:
pass
except JSONRPCException:
pass
if not started:
log.error("Failed to connect to bitcoind")
return False
# generate 250 blocks (50 BTC coinbase each), and confirm them
# also generate 432 blocks to activate segwit
bitcoind = virtualchain.connect_bitcoind(opts)
res = bitcoind.generate(TEST_FIRST_BLOCK_HEIGHT-1)
if len(res) != TEST_FIRST_BLOCK_HEIGHT-1:
log.error("Did not generate %s blocks" % TEST_FIRST_BLOCK_HEIGHT-1)
return False
log.debug("bitcoind -regtest is ready")
return True
def fill_wallet( bitcoind, wallet, value ):
"""
Fill a test wallet on a regtest bitcoind.
Convert the wallet's keys to testnet format, if needed.
Return True on success
Raise on error
"""
if type(wallet.privkey) in [str, unicode]:
# single private key
testnet_wif = virtualchain.BitcoinPrivateKey(virtualchain.get_singlesig_privkey(wallet.privkey)).to_wif()
addr = virtualchain.BitcoinPublicKey(wallet.pubkey_hex).address()
log.debug("Fill %s (%s) with %s" % (addr, testnet_wif, value ))
bitcoind.importprivkey(testnet_wif, "")
bitcoind.sendtoaddress( addr, value )
elif wallet.segwit and len(wallet.privkey['private_keys']) == 1:
# single private key, p2sh-p2wpkh
testnet_wif = virtualchain.BitcoinPrivateKey(virtualchain.get_singlesig_privkey(wallet.privkey)).to_wif()
addr = virtualchain.BitcoinPublicKey(wallet.pubkey_hex).address()
bitcoind.importprivkey(testnet_wif, "")
segwit_wallet = virtualchain.make_segwit_info(testnet_wif)
p2sh_p2wpkh_addr = bitcoind.addwitnessaddress(addr)
assert p2sh_p2wpkh_addr == segwit_wallet['address'], 'bitcoind.addwitnessaddress({}) == {}, expected {}'.format(addr, p2sh_p2wpkh_addr, segwit_wallet['address'])
log.debug("Fill p2sh-p2wpkh %s (%s) with %s" % (segwit_wallet['address'], testnet_wif, value ))
bitcoind.sendtoaddress( segwit_wallet['address'], value )
elif wallet.segwit and len(wallet.privkey['private_keys']) > 1:
# multisig p2sh-p2wsh
testnet_wifs = []
testnet_pubks = []
for pk in wallet.privkey['private_keys']:
if not pk.startswith("c"):
pk = virtualchain.BitcoinPrivateKey(pk).to_wif()
testnet_wifs.append(pk)
testnet_pubks.append( virtualchain.BitcoinPrivateKey(pk).public_key().to_hex() )
multisig_info = virtualchain.make_multisig_info( wallet.m, testnet_wifs )
bitcoind.addmultisigaddress( wallet.m, testnet_pubks )
bitcoind.importaddress( multisig_info['address'] )
# have bitcoin track the p2sh-p2wsh equivalent address
segwit_wallet = virtualchain.make_multisig_segwit_info( wallet.m, testnet_wifs )
p2sh_p2wsh_addr = bitcoind.addwitnessaddress(multisig_info['address'])
assert p2sh_p2wsh_addr == segwit_wallet['address'], 'bitcoind.addwitnessaddress({}) == {}, expected {}'.format(multisig_info['address'], p2sh_p2wsh_addr, segwit_wallet['address'])
log.debug("Fill p2sh-p2wsh %s with %s" % (segwit_wallet['address'], value ))
bitcoind.sendtoaddress( segwit_wallet['address'], value )
else:
# multisig address
testnet_wifs = []
testnet_pubks = []
for pk in wallet.privkey['private_keys']:
if not pk.startswith("c"):
pk = virtualchain.BitcoinPrivateKey(pk).to_wif()
testnet_wifs.append(pk)
testnet_pubks.append( virtualchain.BitcoinPrivateKey(pk).public_key().to_hex() )
multisig_info = virtualchain.make_multisig_info( wallet.m, testnet_wifs )
bitcoind.addmultisigaddress( wallet.m, testnet_pubks )
bitcoind.importaddress( multisig_info['address'] )
log.debug("Fill %s with %s" % (multisig_info['address'], value ))
bitcoind.sendtoaddress( multisig_info['address'], value )
return True
def get_wallet_addr( wallet ):
"""
Get a wallet's address
"""
if type(wallet.privkey) in [str, unicode]:
return virtualchain.BitcoinPublicKey(wallet.pubkey_hex).address()
else:
return wallet.addr
def bitcoin_regtest_fill_wallets( working_dir, wallets, default_payment_wallet=None ):
"""
Given a set of wallets, make sure they each have 50 btc.
If given, fill the default payment walet with 250 btc
(will be used to subsidize other operations)
"""
opts = bitcoin_regtest_opts(working_dir)
bitcoind = virtualchain.connect_bitcoind( opts )
for wallet in wallets:
# fill each wallet
fill_wallet( bitcoind, wallet, 50 )
if default_payment_wallet is not None:
# fill optional default payment address
fill_wallet( bitcoind, default_payment_wallet, 250 )
bitcoind.generate(6)
print >> sys.stderr, ""
for wallet in wallets + [default_payment_wallet]:
if wallet is None:
continue
addr = get_wallet_addr( wallet )
unspents = bitcoind.listunspent( 0, 200000, [addr] )
SATOSHIS_PER_COIN = 10**8
value = sum( [ int(round(s["amount"]*SATOSHIS_PER_COIN)) for s in unspents ] )
print >> sys.stderr, "Address %s loaded with %s satoshis" % (addr, value)
print >> sys.stderr, ""
return True
def bitcoin_regtest_next_block(working_dir):
"""
Get the blockchain height from the regtest daemon
"""
opts = bitcoin_regtest_opts(working_dir)
bitcoind = virtualchain.connect_bitcoind( opts )
bitcoind.generate(1)
log.debug("next block (now at %s)" % bitcoind.getblockcount())
def bitcoin_regtest_connect( opts, reset=False ):
"""
Create a connection to bitcoind -regtest
"""
bitcoind = virtualchain.default_connect_bitcoind(opts)
return bitcoind
class BitcoinRegtestUTXOProvider(object):
"""
Bitcoin regtest UTXO provider
All addresses must be created within the bitcoind wallet.
"""
def __init__(self, bitcoind):
self.bitcoind = bitcoind
def get_unspents( self, address, blockchain_client=None ):
"""
Get unspent outputs for an address.
(blockchain_client is a BitcoindConnection)
"""
if blockchain_client is None:
blockchain_client = self.bitcoind
SATOSHIS_PER_COIN = 10**8
unspents_tmp = blockchain_client.listunspent( 0, 200000, [address] )
unspents_all = [{
"transaction_hash": s["txid"],
'outpoint': {
'hash': s['txid'],
'index': s['vout'],
},
"value": int(round(s["amount"]*SATOSHIS_PER_COIN)),
"out_script": s["scriptPubKey"],
"confirmations": s["confirmations"]
}
for s in unspents_tmp
]
return unspents_all
def broadcast_transaction( self, hex_tx, blockchain_client=None ):
"""
Send a raw transaction to bitcoin regtest.
(blockchain_client is a BitcoindConnection)
"""
if blockchain_client is None:
blockchain_client = self.bitcoind
res = blockchain_client.sendrawtransaction( hex_tx )
if len(res) > 0:
ret = {'tx_hash': res, 'success': True}
return ret
else:
raise Exception("Invalid response: %s" % res)
def parse_args( argv ):
"""
Parse argv to get block time, scenario path, working dir, etc.
"""
parser = argparse.ArgumentParser(description='Run a test scenario')
parser.add_argument("--interactive", dest='blocktime', type=int, help='Run interactively, and generate blocks every `blocktime` seconds', required=False)
parser.add_argument("--interactive-web", dest='webtest_port', type=int,
help='Run interactively, but generate blocks only when programmatically instructedr', required=False)
parser.add_argument("--working-dir", dest='working_dir', type=str, help="Working directory to use to store database state", required=False)
parser.add_argument("--influx", dest='influx', action='store_true', help="Write test output to monitoring in additon to stdout", required=False)
parser.add_argument('--force-segwit', dest='force_segwit', action='store_true', help="Forcibly convert all wallets to support segwit transactions", required=False)
parser.add_argument("--output", dest="output_path", action='store', help='Redirect all output to a given file', required=False)
parser.add_argument("scenario_module", type=str, help="Python module to run")
args, _ = parser.parse_known_args()
return args
def influx_write( influx_client, test_start, test_name, status ):
test_time = datetime.utcnow() - test_start
git_commit = os.environ["GIT_COMMIT"]
git_branch = os.environ["GIT_BRANCH"]
num_tests = os.environ["NUM_TESTS"]
point = [
{
"measurement": "integration_tests",
"tags": {
"test_run": "{}-{}".format(git_branch, git_commit),
"git_branch": git_branch,
"git_commit": git_commit,
"status": status,
"test_scenario": test_name
},
"time": datetime.utcnow().isoformat(),
"fields": {
"runtime": test_time.seconds,
"num_tests": num_tests
}
}
]
final_exception = None
for i in xrange(0, 10):
try:
influx_client.write_points(point)
return
except Exception as e:
# this can sometimes fail due to network timeouts
# do jittery back-off, up to 5 minutes
log.exception(e)
log.debug("Will sleep for a bit and try to context Influx again")
time.sleep(min(300, (2**i) * (1 + random.random())))
final_exception = e
raise final_exception
if __name__ == "__main__":
test_start = datetime.utcnow()
test_end = None
if len(sys.argv) < 2:
print >> sys.stderr, "Usage: %s [--interactive [blocktime]] [--interactive-web port] [--influx] [--force-segwit] [--output PATH] [scenario.import.path] [OPTIONAL: working dir]"
sys.exit(1)
args = parse_args(sys.argv)
interactive = False
blocktime = 10
webtest_port = None
working_dir = None
scenario_module = args.scenario_module
influx_client = None
force_segwit = False
output_path = None
output_f = None
output_fd = None
if hasattr(args, 'output_path') and args.output_path:
output_path = args.output_path
if os.path.exists(output_path):
os.unlink(output_path)
# redirect
if output_path != '-':
# just send stderr to stdout
output_f = open(output_path, 'w')
output_fd = output_f.fileno()
stdout_fileno = sys.stdout.fileno()
os.close(sys.stdout.fileno())
os.dup2(output_fd, stdout_fileno)
else:
output_f = sys.stdout
output_fd = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
os.close(stderr_fileno)
os.dup2(output_fd, stderr_fileno)
# make stdout unbufferred
class Unbuffered(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
sys.stdout = Unbuffered(sys.stdout)
log.debug("Sending all output to {}".format(output_path))
if hasattr(args, "influx") and args.influx is not False:
# TODO: pull config from ENV
# host = os.environ["INFLUX_HOST"]
influx_host = os.environ["INFLUX_HOST"]
influx_user = os.environ["INFLUX_USER"]
influx_pass = os.environ["INFLUX_PASS"]
influx_ssl = os.environ["INFLUX_SSL"]
if influx_ssl == "True":
influx_ssl = True
influx_client = InfluxDBClient(host=influx_host, port=8086, username=influx_user, password=influx_pass, ssl=influx_ssl, database='testing')
influx_client.create_database("testing")
if hasattr(args, "blocktime") and args.blocktime is not None:
interactive = True
g_interactive = True
blocktime = args.blocktime
log.debug("Interactive session; block time is %s" % blocktime)
if hasattr(args, 'webtest_port') and args.webtest_port is not None:
interactive = True
g_interactive = True
webtest_port = args.webtest_port
log.debug("Interactive Web/programmatic session; port is {}".format(webtest_port))
if hasattr(args, "working_dir") and args.working_dir is not None:
working_dir = args.working_dir
else:
working_dir = "/tmp/blockstack-run-scenario.%s" % scenario_module
# erase prior state
if os.path.exists(working_dir):
log.debug("Remove %s" % working_dir)
shutil.rmtree(working_dir)
if hasattr(args, 'force_segwit') and args.force_segwit:
force_segwit = True
if not os.path.exists(working_dir):
os.makedirs(working_dir)
client_working_dir = os.path.join(working_dir, "client")
client_metadata = os.path.join(client_working_dir, "metadata")
client_accounts_path = os.path.join(client_working_dir, "app_accounts")
client_users_path = os.path.join(client_working_dir, "users")
client_queue_path = os.path.join( client_working_dir, "queues.db" )
config_file = os.path.join( working_dir, "blockstack-server.ini" )
client_config_file = os.path.join( client_working_dir, "client.ini" )
client_datastores_path = os.path.join( client_working_dir, "datastores" )
os.makedirs(client_working_dir)
os.makedirs(client_metadata)
os.makedirs(client_accounts_path)
os.makedirs(client_datastores_path)
# export to test
os.environ["BLOCKSTACK_CLIENT_CONFIG"] = client_config_file
os.environ["BLOCKSTACK_SERVER_CONFIG"] = config_file
os.environ['BLOCKSTACK_SEGWIT_TEST'] = '1'
os.environ['BLOCKSTACK_TEST_NAME'] = scenario_module
BITCOIN_DIR = os.path.join(working_dir, 'bitcoin-regtest')
if not os.path.exists(BITCOIN_DIR):
os.makedirs(BITCOIN_DIR)
if force_segwit:
# test framework flag
os.environ['BLOCKSTACK_TEST_FORCE_SEGWIT'] = '1'
# cli flag
os.environ['BLOCKSTACK_FORCE_SEGWIT'] = '1'
print 'WARN: Forcing all wallets to generate segwit transactions'
# load up the scenario (so it can set its own extra envars)
scenario = load_scenario( scenario_module )
if scenario is None:
print "Failed to load '%s'" % scenario_module
if args.influx:
influx_write(influx_client, test_start, scenario_module.split(".")[2], "failure-load")
sys.exit(1)
# *now* we can import blockstack
import blockstack
import blockstack.blockstackd as blockstackd
from blockstack.lib import *
from blockstack.lib import nameset as blockstack_state_engine
import blockstack_client
import blockstack_integration_tests.scenarios.testlib as testlib
# set up bitcoind
bitcoin_regtest_reset(working_dir)
# set up disk storage
if os.path.exists("/tmp/blockstack-disk"):
shutil.rmtree("/tmp/blockstack-disk")
if os.path.exists("/tmp/blockstack-integration-test-storage"):
shutil.rmtree("/tmp/blockstack-integration-test-storage")
# set up SPV
if os.path.exists("/tmp/blockstack-test-scenario-spv.dat"):
os.unlink("/tmp/blockstack-test-scenario-spv.dat")
# set up default payment wallet
default_payment_wallet = testlib.MultisigWallet( 2, '5JYAj69z2GuFAZHrkhRuBKoCmKh6GcPXgcw9pbH8e8J2pu2RU9z', '5Kfg4xkZ1gGN5ozgDZ37Mn3EH9pXSuWZnQt1pzax4cLax8PetNs', '5JXB7rNxZa8yQtpuKtwy1nWUUTgdDEYTDmaEqQvKKC8HCWs64bL' )
# load wallets
bitcoin_regtest_fill_wallets( working_dir, scenario.wallets, default_payment_wallet=default_payment_wallet )
testlib.set_default_payment_wallet( default_payment_wallet )
testlib.set_wallets( scenario.wallets )
# did the scenario change the storage drivers?
storage_drivers = BLOCKSTACK_STORAGE_DRIVERS
if os.environ.get("CLIENT_STORAGE_DRIVERS", None) is not None:
storage_drivers = os.environ.get("CLIENT_STORAGE_DRIVERS")
storage_drivers_required_write = BLOCKSTACK_STORAGE_DRIVERS
if os.environ.get('CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE', None) is not None:
storage_drivers_required_write = os.environ.get('CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE')
# did the scenario want to start up its own data servers?
serve_data = "True"
redirect_data = "False"
data_servers = ""
if os.environ.get("DATA_SERVERS", None) is not None:
redirect_data = "True"
serve_data = "False"
data_servers = os.environ.get("DATA_SERVERS")
# generate config file
rc = generate_config_file( working_dir, scenario, config_file, DEFAULT_SERVER_INI_TEMPLATE, \
{"ZONEFILES": os.path.join(working_dir, "zonefiles")} )
if rc != 0:
log.error("failed to write config file: exit %s" % rc)
if args.influx:
influx_write(influx_client, test_start, scenario_module.split(".")[2], "failure-config")
sys.exit(1)
# generate config file for the client
rc = generate_config_file( working_dir, scenario, client_config_file, DEFAULT_CLIENT_INI_TEMPLATE, \
{"CLIENT_METADATA": client_metadata, \
"CLIENT_QUEUE_PATH": client_queue_path, \
"CLIENT_STORAGE_DRIVERS": storage_drivers, \
'CLIENT_STORAGE_DRIVERS_REQUIRED_WRITE': storage_drivers_required_write, \
"CLIENT_ACCOUNTS_PATH": client_accounts_path, \
"CLIENT_USERS_PATH": client_users_path, \
"CLIENT_DATASTORES_PATH": client_datastores_path, \
"SERVE_DATA": serve_data, \
"REDIRECT_DATA": redirect_data, \
"DATA_SERVERS": data_servers })
if rc != 0:
if args.influx:
influx_write(influx_client, test_start, scenario_module.split(".")[2], "failure-config")
log.error("failed to write config file: exit %s" % rc)
sys.exit(1)
# run the test
rc = run_scenario( working_dir, scenario, config_file, client_config_file, interactive=interactive, blocktime=blocktime, webtest_port=webtest_port )
if rc:
# time the test
# retrieve git information from the environment
print "SUCCESS %s" % scenario.__name__
# shutil.rmtree( working_dir )
if args.influx:
influx_write(influx_client, test_start, scenario_module.split(".")[2], "success")
sys.exit(0)
else:
if args.influx:
influx_write(influx_client, test_start, scenario_module.split(".")[2], "failure")
print >> sys.stderr, "FAILURE %s" % scenario.__name__
print >> sys.stderr, "Test output in %s" % working_dir
os.abort()