Files
stacks-puppet-node/integration_tests/bin/blockstack-testbox
2017-11-16 13:06:24 -05:00

1391 lines
48 KiB
Python
Executable File

#!/usr/bin/python2
import os
import sys
import subprocess
import virtualenv
import posixpath
import pwd
import tarfile
import shutil
import argparse
import threading
import json
import logging
import time
import gzip
import signal
import SimpleHTTPServer
import re
import tempfile
import random
from SimpleHTTPServer import BaseHTTPServer, SimpleHTTPRequestHandler
DEBUG = True
DEFAULT_TEST_RUNNER_PORT = 8086
COLORS = {
"default": "",
"blue": "\x1b[0;34m",
"green": "\x1b[0;32m",
"red": "\x1b[0;31m",
"yellow": "\x1b[0;93m",
"reset": "\x1b[0m",
}
#following from Python cookbook, #475186
def has_colors(stream):
"""
Does the given file-like object support colors?
"""
if not hasattr(stream, "isatty"):
return False
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
# guess false in case of error
return False
if not has_colors(sys.stdout):
# unset all colors
for c in COLORS:
COLORS[c] = ""
DEFAULT_DEPS = [
{
'name': 'virtualchain',
'git': 'https://github.com/blockstack/virtualchain',
'branch': 'hotfix/faster-cryptography',
'type': 'python',
},
{
'name': 'blockstack-core',
'git': 'https://github.com/blockstack/blockstack-core',
'branch': 'hotfix/ipfs-support',
'type': 'python',
'subpackages': ['integration_tests'],
},
{
'name': 'clean_scrypt',
'git': None,
'branch': None,
'type': 'shell',
'command': 'pip uninstall -y scrypt; pip install scrypt; if [ -f /usr/local/lib/python2.7/dist-packages/_scrypt.so ]; then cp -a /usr/local/lib/python2.7/dist-packages/_scrypt.so "$VIRTUAL_ENV"/lib/python2.7/site-packages; elif [ -f /usr/lib/python2.7/site-packages/_scrypt.so ]; then cp -a /usr/lib/python2.7/site-packages/_scrypt.so "$VIRTUAL_ENV"/lib/python2.7/site-packages; fi'
},
{
'name': 'blockstack-storage.js',
'git': 'https://github.com/blockstack/blockstack-storage-js',
'branch': 'master',
'type': 'node.js',
'npm_build_commands': ['dev-build'],
'npm_deps': ['bigi', 'jsonify', 'promise'], # TODO: add these to package.json
},
{
'name': 'blockstack.js',
'git': 'https://github.com/blockstack/blockstack.js',
'branch': 'master',
'type': 'node.js',
},
]
DEFAULT_BOOTSTRAP_SCRIPT = "sudo apt-get -y install curl software-properties-common && sudo apt-add-repository -y ppa:bitcoin/bitcoin && echo \"deb https://deb.nodesource.com/node_6.x xenial main\" > /tmp/nodesource.list && sudo mv /tmp/nodesource.list /etc/apt/sources.list.d/ && curl -s https://deb.nodesource.com/gpgkey/nodesource.gpg.key | sudo apt-key add - && sudo apt-get update && sudo apt-get -y install python python-virtualenv python-pip libssl-dev libffi-dev build-essential git bitcoind nodejs lsof sqlite3; sudo npm install -g babel babel-cli browserify"
def get_logger(name=None):
"""
Get logger
"""
level = logging.CRITICAL
if DEBUG:
logging.disable(logging.NOTSET)
level = logging.DEBUG
if name is None:
name = "blockstack-testbox"
log = logging.getLogger(name=name)
log.setLevel( level )
console = logging.StreamHandler()
console.setLevel( level )
log_format = ('[%(asctime)s] [%(levelname)s] [%(module)s:%(lineno)d] (' + str(os.getpid()) + '.%(thread)d) %(message)s' if DEBUG else '%(message)s')
formatter = logging.Formatter( log_format )
console.setFormatter(formatter)
log.propagate = False
if len(log.handlers) > 0:
for i in xrange(0, len(log.handlers)):
log.handlers.pop(0)
log.addHandler(console)
return log
log = get_logger()
def setup_venv(venv_dir):
"""
Set up a virtual environment in the given directory
"""
log.debug("Setting up virtualenv in {}".format(venv_dir))
virtualenv.create_environment(venv_dir, clear=True)
return True
def enter_venv(venv_dir):
"""
Activate the virtualenv
"""
log.debug("Entering virtualenv {}".format(venv_dir))
os.environ['VIRTUAL_ENV'] = venv_dir
os.environ['_OLD_VIRTUAL_PATH'] = os.environ['PATH']
os.environ['PATH'] = '{}/bin:{}'.format(venv_dir, os.environ['PATH'])
# also turn on node.js envars
node_path = ''
if os.environ.get('NODE_PATH') is not None:
os.environ['_OLD_VIRTUAL_NODE_PATH'] = os.environ['NODE_PATH']
node_path = os.environ['NODE_PATH']
virtual_node_path = os.path.join(venv_dir, 'nodejs/lib/node_modules')
virtual_npm_config_prefix = os.path.join(venv_dir, 'nodejs')
if not os.path.exists(virtual_node_path):
os.makedirs(virtual_node_path)
os.environ['NODE_PATH'] = virtual_node_path
if os.environ.get('NPM_CONFIG_PREFIX') is not None:
os.environ['_OLD_VIRTUAL_NPM_CONFIG_PREFIX'] = os.environ['NPM_CONFIG_PREFIX']
if os.environ.get('npm_config_prefix') is not None:
os.environ['_OLD_npm_config_prefix'] = os.environ['npm_config_prefix']
os.environ['NPM_CONFIG_PREFIX'] = virtual_npm_config_prefix
os.environ['npm_config_prefix'] = virtual_npm_config_prefix
http_port_path = '{}/http.port'.format(venv_dir)
if os.path.exists(http_port_path):
with open(http_port_path, 'r') as f:
d = f.read()
http_port = int(d)
os.environ['VIRTUAL_ENV_HTTP_PORT'] = str(http_port)
return True
def in_venv(venv_dir=None):
"""
Are we in a virtualenv? Are we in the specific virtualenv?
"""
if not os.environ.has_key('VIRTUAL_ENV') or not os.environ.has_key('_OLD_VIRTUAL_PATH'):
return False
if venv_dir is not None and posixpath.normpath(venv_dir) != posixpath.normpath(os.environ['VIRTUAL_ENV']):
return False
return True
def exit_venv():
"""
Deactivate the virtualenv
"""
log.debug("Exiting virtualenv")
assert in_venv(), 'Not in a virtualenv'
os.environ['PATH'] = os.environ['_OLD_VIRTUAL_PATH']
del os.environ['VIRTUAL_ENV']
if os.environ.get('_OLD_VIRTUAL_NODE_PATH'):
os.environ['NODE_PATH'] = os.environ['_OLD_VIRTUAL_NODE_PATH']
elif os.environ.get('NODE_PATH'):
del os.environ['NODE_PATH']
if os.environ.get('_OLD_NPM_CONFIG_PREFIX'):
os.environ['NPM_CONFIG_PREFIX'] = os.environ['_OLD_NPM_CONFIG_PREFIX']
elif os.environ.get('NPM_CONFIG_PREFIX'):
del os.environ['NPM_CONFIG_PREFIX']
if os.environ.get('_OLD_npm_config_prefix'):
os.environ['npm_config_prefix'] = os.environ['_OLD_npm_config_prefix']
elif os.environ.get('npm_config_prefix'):
del os.environ['npm_config_prefix']
return True
def bootstrap_host(user, host, host_venv_dir, test_root, port, deps_path=None, ssh_id_path=None, bootstrap_script=DEFAULT_BOOTSTRAP_SCRIPT, logfile=None):
"""
Bootstrap a host
Return True on success
Return False on error
"""
def run_proc(cmd, step_name):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log.error("Failed to bootstrap {}@{}: failed step: {}".format(user, host, step_name))
if logfile:
with open(logfile, 'w') as f:
f.write(out)
f.write(err)
else:
log.error("Stdout:\n{}".format(" \n".join(out.strip().split('\n'))))
log.error("Stderr:\n{}".format(" \n".join(err.strip().split('\n'))))
return False
return True
id_opt = ''
if ssh_id_path is not None:
id_opt = '-i "{}"'.format(ssh_id_path)
# copy this program over
cmd = 'scp {} "{}" "{}@{}:/tmp/blockstack-testbox"'.format(id_opt, sys.argv[0], user, host)
log.debug("$ {}".format(cmd))
res = run_proc(cmd, 'scp {} over'.format(sys.argv[0]))
if not res:
return False
# if given, copy deps JSON file over
if deps_path is not None:
cmd = 'scp {} "{}" "{}@{}:/tmp/dependencies.json"'.format(id_opt, deps_path, user, host)
log.debug("$ {}".format(cmd))
res = run_proc(cmd, 'scp {} over'.format(host_venv_dir))
if not res:
return False
# run bootstrap script
cmd = 'ssh {} -t "{}@{}" \'{}\''.format(id_opt, user, host, bootstrap_script)
log.debug("$ {}".format(cmd))
res = run_proc(cmd, 'run bootstrap script')
if not res:
return False
# run setup script
deps_opt = ''
if deps_path:
deps_opt = '--dependencies=/tmp/dependencies.json'
cmd = 'ssh {} -t "{}@{}" \'test -d "{}" && rm -rf "{}"; test -d "{}" && rm -rf "{}"; python /tmp/blockstack-testbox setup "{}" {}\''.format(id_opt, user, host, host_venv_dir, host_venv_dir, test_root, test_root, host_venv_dir, deps_opt)
# cmd = 'ssh {} -t "{}@{}" \'python /tmp/blockstack-testbox setup "{}" {}\''.format(id_opt, user, host, host_venv_dir, host_venv_dir, host_venv_dir, deps_opt)
log.debug("$ {}".format(cmd))
res = run_proc(cmd, 'run setup script')
if not res:
return False
# start an HTTP server, if there isn't one already. record the port to the host's venv
cmd = 'ssh {} -t "{}@{}" \'test -d "{}" || mkdir -p "{}"; sudo killall -9 blockstack-testbox; python /tmp/blockstack-testbox httpd-stop {}; echo "{}" > "{}/http.port"; nohup python /tmp/blockstack-testbox httpd "{}" {} >/dev/null 2>&1 & sleep 5\''.format(id_opt, user, host, test_root, test_root, port, port, host_venv_dir, test_root, port)
log.debug("$ {}".format(cmd))
res = run_proc(cmd, 'start HTTP server')
if not res:
return False
return True
def install_dependencies(src_dir, deps, venv_dir=None):
"""
Install all dependencies from source.
@deps is [{'name': 'name of dependency', 'git': 'git repository URL', 'branch': ..., 'type': 'python' or 'js'}]
@venv_dir is the virtualenv directory
Return True on success
Return False on error
"""
assert in_venv(venv_dir), 'Not in a virtual environment'
assert isinstance(deps, list), 'Malformed dependencies: not a list'
for i, dep in enumerate(deps):
assert isinstance(dep, dict), 'Malformed dependencies: item {} is not a dict'.format(i)
for k in ['name', 'git', 'branch', 'type']:
assert k in dep, 'Malformed dependency: item {} is missing "{}"'.format(k)
def log_err(cmd, retval, err):
log.error("`{}` failed: exit {}".format(cmd, retval))
log.error("Stderr:\n{}".format(' \n'.join(err.strip().split('\n'))))
log.error("Env:\n{}".format("\n".join(["{}={}".format(k,v) for (k,v) in os.environ.items()])))
for dep in deps:
name = dep['name']
giturl = dep.get('git', None)
branch = dep.get('branch', 'master')
pkgtype = dep['type']
log.debug("Install `{}` from `{}@{}`".format(name, giturl, branch))
# git clone, if git is provided
clonedir = os.path.join(src_dir, name)
if giturl:
if os.path.exists(clonedir):
# already cloned
log.debug("Appears to be installed already")
continue
if not branch:
branch = 'master'
cmd = "git clone '{}' '{}' && cd '{}' && git checkout '{}'".format(giturl, clonedir, clonedir, branch)
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log_err(cmd, retval, err)
return False
package_list = ['.']
if dep.has_key('subpackages'):
package_list += dep['subpackages']
for pak in package_list:
pakdir = os.path.join(clonedir, pak)
if pak != '.':
log.debug("Subpackage {}".format(pak))
if pkgtype == 'python':
# setup.py needs to be there
if not os.path.exists(os.path.join(pakdir, 'setup.py')):
log.error("`{}` (at {}) does not have a setup.py".format(name, pakdir))
return False
cwd = os.getcwd()
os.chdir(pakdir)
# setup.py build
cmd = 'python ./setup.py build'
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log_err(cmd, retval, err)
os.chdir(cwd)
return False
# setup.py install
cmd = 'python ./setup.py install'
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
os.chdir(cwd)
if retval != 0:
log_err(cmd, retval, err)
return False
elif pkgtype == 'node.js':
# package.json needs to be there
if not os.path.exists(os.path.join(pakdir, 'package.json')):
log.error('`{}` (at {}) does not have a package.json'.format(name, pakdir))
return False
cwd = os.getcwd()
os.chdir(pakdir)
cmd = 'npm install'
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
os.chdir(cwd)
log_err(cmd, retval, err)
return False
# extra deps not in package.json?
if dep.has_key('npm_deps'):
for npm_dep in dep['npm_deps']:
cmd = "npm install -g '{}'".format(npm_dep)
log.debug("$ {}".format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
os.chdir(cwd)
log_err(cmd, retval, err)
return False
# npm-specific commands
if dep.has_key('npm_build_commands'):
for npm_command in dep['npm_build_commands']:
cmd = "npm run '{}'".format(npm_command)
log.debug("$ {}".format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
os.chdir(cwd)
log_err(cmd, retval, err)
return False
# npm install -g
cmd = 'npm install -g'
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
os.chdir(cwd)
if retval != 0:
os.chdir(cwd)
log_err(cmd, retval, err)
return False
elif pkgtype == 'shell':
# run a shell command
cmd = dep['command']
log.debug("$ {}".format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log_err(cmd, retval, err)
return False
else:
log.error("Unrecognized dependency type `{}`".format(pkgtype))
return False
return True
def main_test(test_package_name, test_dir, test_user=None):
"""
Privileged operation. Run the test in the given environment.
Run the test, collect its output, and exit.
* return 0 if the test succeeded
* return 1 if the test failed
test_dir will become /tmp. It will be chmod'ed 0777.
Test output will be put into
"""
assert os.getuid() == 0, "must be called as a root user"
test_uid = None
test_gid = None
if test_user:
pw = pwd.getpwnam(test_user)
test_uid = pw.pw_uid
test_gid = pw.pw_gid
if not os.path.exists(test_dir):
os.makedirs(test_dir)
# set up /tmp
cmd = "mount -o bind '{}' /tmp && chmod 0777 -R /tmp".format(test_dir)
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log.error("Failed to mount /tmp (exit {}): {}".format(retval, err.strip()))
return 1
# become the test user, if given
if test_uid:
log.debug("Switching to user `{}`".format(test_user))
os.setgid(test_gid)
os.setuid(test_uid)
# run the test
os.chdir(test_dir)
cmd = "blockstack-test-scenario --output /tmp/{}.out {}".format(test_package_name, test_package_name)
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
return retval
def main_watchdog(venv_dir, test_package_name, test_root, timeout=(60 * 75), test_user=None):
"""
Privileged operation. Start up the test in its own private environment, and collect the output.
Kill the process and all of its children after a given timeout.
* Unshare the network namespace
* Unshare the mount namespace
* make our own /tmp
Return {'status': True, 'logs': ...} on success
Return {'status': False 'logs': ...} on failure
Test result will be in $test_root/$test_package_name/test.out.gz, if present at all.
"""
class KillThread(threading.Thread):
def __init__(self, pid, deadline):
threading.Thread.__init__(self)
self.pid = pid
self.deadline = deadline
self.running = True
def run(self):
while time.time() < self.deadline and self.running:
time.sleep(0.5)
if self.running:
# out of time
log.error("Killing stalled test")
os.kill(self.pid, signal.SIGKILL)
def signal_join(self):
self.running = False
assert os.getuid() == 0, "must be called as a root user"
http_port = os.environ.get('VIRTUAL_ENV_HTTP_PORT', None)
assert http_port, 'Test environment is not properly set up: no HTTP port'
if not os.path.exists(test_root):
os.makedirs(test_root)
test_dir = os.path.join(test_root, test_package_name)
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
os.makedirs(test_dir)
os.chmod(test_dir, 0777)
# run the test in an unshared namespace
worker_cmd = "{} worker {} {} {}".format(sys.argv[0], venv_dir, test_package_name, test_dir)
if test_user:
worker_cmd += ' --test_user {}'.format(test_user)
cmd = 'unshare --net --mount --pid --fork --mount-proc /bin/sh -c \'ifconfig lo up; {}\''.format(worker_cmd)
# TEST TEST TEST
# cmd = '/bin/sh -c "{}"'.format(worker_cmd)
# log.debug("You need to unmount /tmp afterwards")
# TEST TEST TEST
log.debug("# {}".format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
watchdog = KillThread(p.pid, time.time() + timeout)
watchdog.start()
log.debug("Test started; PID={}".format(p.pid))
out, err = p.communicate()
retval = p.returncode
log.debug("Test (pid={}) returned {}".format(p.pid, retval))
if retval != 0:
log.debug("Stderr:\n{}".format('\n'.join([' ' + e for e in err.split('\n')])))
watchdog.signal_join()
watchdog.join()
# save what we have
test_result = False
test_out_path = os.path.join(test_dir, '{}.out'.format(test_package_name))
test_out_path_gz = os.path.join(test_root, '{}.out.gz'.format(test_package_name))
logs_path = None
if os.path.exists(test_out_path):
# what was the test result? Check the end of the file
with open(test_out_path, 'r') as f:
f.seek(-10240, os.SEEK_END)
while True:
line = f.readline()
if len(line) == 0:
break
if line.startswith('SUCCESS '):
test_result = True
break
with open(test_out_path, "rb") as f_in, gzip.open(test_out_path_gz, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.unlink(test_out_path)
logs_path = test_out_path_gz
else:
log.error("No test results in {}".format(test_out_path))
logs_path = None
# let the caller know where and how to get at the test result
return {'status': test_result, 'logs': logs_path, 'port': http_port}
def main_run_test(user, host, venv_dir, test_package_name, test_root, timeout=(60 * 75), test_user=None, ssh_id_path=None):
"""
Run a test on the remote host.
Determine whether it succeeded or failed.
"""
id_opt = ''
if ssh_id_path is not None:
id_opt = '-i "{}"'.format(ssh_id_path)
test_user_opt = ''
if test_user:
test_user_opt = '--test_user "{}"'.format(test_user)
timeout_opt = ''
if timeout:
timeout_opt = '--timeout "{}"'.format(int(timeout))
cmd = ['/usr/bin/ssh'] + ([id_opt] if id_opt else []) + [
'{}@{}'.format(user, host), 'sudo /tmp/blockstack-testbox watchdog "{}" "{}" "{}" {} {}'.format(venv_dir, test_package_name, test_root, test_user_opt, timeout_opt)
]
log.debug("$ {}".format(" ".join(cmd)))
deadline = time.time() + timeout
# cmd = 'ssh {} "{}@{}" \'sudo /tmp/blockstack-testbox watchdog "{}" "{}" "{}" {} {}\''.format(id_opt, user, host, venv_dir, test_package_name, test_root, test_user_opt, timeout_opt)
# log.debug("$ {}".format(cmd))
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
retval = p.returncode
if retval != 0 and retval != 255:
# failed to run
log.error("Failed to run {} on {}@{}: exit code {}".format(test_package_name, user, host, retval))
return {'error': 'Failed to run {} on {}@{}: exit code {}'.format(test_package_name, user, host, retval)}
elif retval == 255:
# ssh had an error
log.error("SSH error running {} on {}@{}".format(test_package_name, user, host))
# wait for the test to finish running
# poll every few minutes, but be mindful of the time
while True:
cmd = ['/usr/bin/ssh'] + ([id_opt] if id_opt else []) + [
'{}@{}'.format(user, host),
'test -f "{}/{}.out.gz" || exit 10; zcat "{}/{}.out.gz" | egrep "(SUCCESS {})|(FAILURE {})"'.format(
test_root, test_package_name, test_root, test_package_name, test_package_name, test_package_name)
]
log.debug("$ {}".format(" ".join(cmd)))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
retval = p.returncode
if retval == 0:
break
elif retval == 10:
# not done yet
log.debug("Test {} is still running on {}@{}".format(test_package_name, user, host))
elif retval != 0 and retval != 255:
# test output is corrupt
log.error("Test {} output on {}@{} is corrupt, aborting...".format(test_package_name, user, host))
print COLORS['yellow'] + "Test {} is corrupt on {}@{}".format(test_package_name, user, host) + COLORS['reset']
return {'status': False}
if time.time() >= deadline:
print COLORS['yellow'] + "Test {} timed out on {}@{}".format(test_package_name, user, host) + COLORS['reset']
sys.stdout.flush()
return {'status': False}
time.sleep(60 + random.random() * 60) # jittery
# ssh back in and get the test results
cmd = ['/usr/bin/ssh'] + ([id_opt] if id_opt else []) + ['{}@{}'.format(user, host), 'zcat "{}/{}.out.gz" | tail -n 100 | grep "SUCCESS {}"'.format(test_root, test_package_name, test_package_name)]
log.debug("$ {}".format(" ".join(cmd)))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
# failure
return {'status': False}
else:
# success!
return {'status': True}
def main_http_server(test_root, portnum):
"""
Run an HTTP server for serving the results of the tests
"""
class TestHTTPServerHandler(SimpleHTTPRequestHandler):
def do_GET(self):
"""
Serve back the list of completed tests, or a completed test.
"""
path = posixpath.normpath(self.path)
rootdir = self.server.rootdir
if path == '/':
# give back a list
test_listing = []
for filename in os.listdir(rootdir):
if filename.endswith('.out.gz'):
test_listing.append(filename)
listing = json.dumps(test_listing)
self.send_response(200)
self.send_header('content-type', 'application/json')
self.send_header('content-length', str(len(listing)))
self.end_headers()
self.wfile.write(listing)
return
else:
testname = os.path.basename(path)
if not testname.endswith('.out.gz'):
self.send_response(404)
return
testpath = os.path.join(rootdir, testname)
if not os.path.exists(testpath):
self.send_response(404)
return
sb = os.stat(testpath)
self.send_response(200)
self.send_header('content-type', 'application/gzip')
self.send_header('content-length', str(int(sb.st_size)))
self.end_headers()
with open(testpath, 'rb') as f:
while True:
buf = f.read(32768)
if len(buf) == 0:
break
self.wfile.write(buf)
return
class TestHTTPServer(BaseHTTPServer.HTTPServer):
def __init__(self, host, port, rootdir):
BaseHTTPServer.HTTPServer.__init__(self, (host, port), TestHTTPServerHandler)
self.rootdir = rootdir
http = TestHTTPServer('0.0.0.0', portnum, test_root)
http.serve_forever()
def main_http_server_stop(portnum):
"""
Stop all http servers on this port
"""
# find PIDs listening on this port
cmd = "lsof -P | grep 'TCP \*:%s' | awk \'{print $2}\' | sort | uniq" % str(portnum)
log.debug('$ {}'.format(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
retval = p.returncode
if retval != 0:
log.error('Command failed: {}'.format(cmd))
return False
pids = [int(p) for p in filter(lambda x: len(x) > 0, out.strip().split('\n'))]
for pid in pids:
try:
log.debug("Kill {}".format(pid))
os.kill(pid, signal.SIGKILL)
except:
log.error("Failed to SIGKILL {}".format(pid))
pass
return True
def main_test_runner(testname, hostinfo, testinfo, ssh_id_path=None, dependencies=None):
"""
Run a set of tests across a set of hosts.
* bootstrap each host with the given dependencies
* start an HTTP server for reporting test status
* schedule tests across hosts as need be
Calling conventions:
* hostinfo entries are "test slots." duplicate entries in hostinfo will cause multiple tests to be run on that host.
Keep serving forever
"""
test_queue = testinfo[:]
remote_test_port = 8086
test_timeout = 60 * 75
bootstrap_logs_dir = tempfile.mkdtemp(prefix='blockstack-test-bootstrap-')
run_queue = []
host_slots = {} # map user@host to number of slots
bootstrap_results = {} # map user@host to True/False
test_results = {} # map test name to test result
for hi in hostinfo:
userhost = '{}@{}'.format(hi['user'], hi['host'])
if not host_slots.has_key(userhost):
host_slots[userhost] = 1
else:
host_slots[userhost] += 1
bootstrap_results[userhost] = False
def host_config(username, testname):
"""
Get host-local config parameters
"""
host_venv_dir = '/home/{}/blockstack-test-venv-{}'.format(username, testname)
host_testout_dir = '/home/{}/blockstack-test-output-{}'.format(username, testname)
return {'host_venv_dir': host_venv_dir, 'host_testout_dir': host_testout_dir}
def host_bootstrap(username, hostname):
"""
Bootstrap the host (runs in a separate thread)
"""
res = host_config(username, testname)
host_venv_dir = res['host_venv_dir']
host_testout_dir = res['host_testout_dir']
res = bootstrap_host(username, hostname, host_venv_dir, host_testout_dir, remote_test_port,
deps_path=dependencies, ssh_id_path=ssh_id_path, logfile=os.path.join(bootstrap_logs_dir, '{}@{}.log'.format(username,hostname)))
bootstrap_results['{}@{}'.format(username, hostname)] = res
return res
def host_run_test(username, hostname, test_module):
"""
Run the test on the given host
"""
res = host_config(username, testname)
host_venv_dir = res['host_venv_dir']
host_testout_dir = res['host_testout_dir']
res = main_run_test(username, hostname, host_venv_dir, test_module, host_testout_dir, timeout=test_timeout, test_user=username, ssh_id_path=ssh_id_path)
test_results[test_module] = res
return res
def count_running():
"""
Get a count of how many tests are running, grouped by user@host
"""
ret = {}
for hi in hostinfo:
userhost = '{}@{}'.format(hi['user'], hi['host'])
ret[userhost] = 0
for res in run_queue:
userhost = res['userhost']
ret[userhost] += 1
return ret
def test_sched(test):
"""
Test scheduler (runs in a separate thread)
Schedule a test to a host
Return {'userhost': ..., 'test': ...} on success
Return None if all slots are taken
"""
# find a free host
userhost = None
found = False
node = None
run_counts = count_running()
for node in hostinfo:
userhost = '{}@{}'.format(node['user'], node['host'])
if run_counts[userhost] < host_slots[userhost]:
# can schedule here
found = True
break
if not found:
# no free hosts
return None
# allocate this test
t = threading.Thread(target=host_run_test, args=(node['user'], node['host'], test))
t_state = {
'thread': t,
'test': test,
}
return {'userhost': userhost, 'test': t_state}
# bootstrap all nodes
all_nodes = []
for hi in hostinfo:
userhost = '{}@{}'.format(hi['user'], hi['host'])
all_nodes.append(userhost)
all_nodes = [{'user': user, 'host': host} for (user, host) in [u.split('@', 1) for u in set(all_nodes)]]
bootstrap_threads = []
for nodeinfo in all_nodes:
t = threading.Thread(target=host_bootstrap, args=(nodeinfo['user'], nodeinfo['host']))
t.start()
bootstrap_threads.append(t)
for t in bootstrap_threads:
t.join()
os.system('stty sane')
log.debug("All nodes bootstrapped; running tests")
# run all tests
while True:
# try to start a new test
if len(test_queue) > 0:
next_test = test_queue[0]
res = test_sched(next_test)
if res is not None:
# scheduled! start it up
userhost = res['userhost']
state = res['test']
log.debug("Scheduled {} on {}".format(next_test, userhost))
state['thread'].start()
test_queue.pop(0)
run_queue.append(res)
# try to join
joined = []
for (i, res) in enumerate(run_queue):
userhost = res['userhost']
state = res['test']
state['thread'].join(0.1)
if not state['thread'].is_alive():
# joined!
log.debug("Joined {} on {}".format(state['test'], userhost))
# what was the result
test_res = test_results.get(state['test'], None)
assert test_res, 'BUG: no test result for {}'.format(state['test'])
joined.append((i, test_res, res))
sys.stdout.flush()
sys.stderr.flush()
# clean out run queue, in reverse order so we don't pop running tests
for (j, test_res, res) in joined:
run_queue[j] = None
userhost = res['userhost']
state = res['test']
test = state['test']
if 'error' in test_res:
print COLORS['red'] + 'FAILURE {} test error: {}'.format(test, test_res['error']) + COLORS['reset']
elif test_res['status']:
print COLORS['green'] + 'SUCCESS {}'.format(test) + COLORS['reset']
else:
print COLORS['red'] + 'FAILURE {}'.format(test) + COLORS['reset'] + ' ({})'.format('http://{}:{}/{}.out.gz'.format(userhost.split('@',1)[1], remote_test_port, test))
sys.stdout.flush()
sys.stderr.flush()
os.system('stty sane')
run_queue = filter(lambda r: r is not None, run_queue)
# are we done?
running = count_running()
num_running = sum([x for (_, x) in running.items()])
if num_running == 0 and len(test_queue) == 0:
break
return True
def read_host_list(hostlist):
"""
Read a file of user@host lines.
blank lines and lines where the first non-whitespace character is # will be ignored.
Return [{'user': ..., 'host': ...}]
"""
hostinfo = []
if not os.path.exists(hostlist):
print >> sys.stderr, 'No such file or directory: {}'.format(hostlist)
return None
with open(hostlist) as f:
d = f.read()
lines = filter(lambda x: len(x) > 0 and not x.startswith('#'), [l.strip() for l in d.strip().split('\n')])
for line in lines:
user, host = line.split('@', 1)
hostinfo.append({'user': user, 'host': host})
return hostinfo
def read_test_list(testlist):
"""
Read a file of test packages
blank lines and lines where the first non-whitespace character is # will be ignored.
Return ['test']
"""
testinfo = []
if not os.path.exists(testlist):
print >> sys.stderr, 'No such file or directory: {}'.format(testlist)
return None
with open(testlist) as f:
d = f.read()
lines = filter(lambda x: len(x) > 0 and not x.startswith('#'), [l.strip() for l in d.strip().split('\n')])
return lines
def main(argv):
"""
main method
"""
argparser = argparse.ArgumentParser(description='{}: Blockstack test driver'.format(sys.argv[0]))
subparsers = argparser.add_subparsers(dest='action', help='the action to perform')
parser = subparsers.add_parser('watchdog', help='Start up a test watchdog to run a test, and collect its logs')
parser.add_argument('venv_dir', action='store', help='Virtualenv with all the packages installed')
parser.add_argument('test_module', action='store', help='Name of the test to run')
parser.add_argument('tests_root', action='store', help='Directory to store test state')
parser.add_argument('--test_user', action='store', help='The user to run the test as')
parser.add_argument('--timeout', action='store', help='The number of seconds this test is allowed to run for')
parser = subparsers.add_parser('worker', help='Test-running worker subprocess')
parser.add_argument('venv_dir', action='store', help='Virtualenv with all the packages installed')
parser.add_argument('test_module', action='store', help='Name of the test to run')
parser.add_argument('test_dir', action='store', help='The directory in which to store the test state')
parser.add_argument('--test_user', action='store', help='The user to become in order to run the test')
parser = subparsers.add_parser("setup", help='Set up a virtualenv with all of the Blockstack packages')
parser.add_argument('venv_dir', action='store', help='The virtualenv directory')
parser.add_argument('--dependencies', action='store', help='Path to a file that encodes the list of packages to install.')
parser = subparsers.add_parser("httpd", help='Start the test HTTP server for serving back test results')
parser.add_argument('tests_root', action='store', help='The directory into which test output will be written')
parser.add_argument('port', action='store', type=int, help='The port to serve test results on')
parser = subparsers.add_parser("httpd-stop", help='Stop the test HTTP server')
parser.add_argument('port', action='store', type=int, help='The port that any running HTTP servers would be listening on')
parser = subparsers.add_parser('run-test', help='Run a test on a remote host')
parser.add_argument('venv_dir', action='store', help='Virtualenv directory on the remote host')
parser.add_argument('test_module', action='store', help='Test module to run')
parser.add_argument('tests_root', action='store', help='Directory on the remote host that stores the test state and output')
parser.add_argument('login', action='store', help='user@host to SSH in as')
parser.add_argument('--test_user', action='store', help='The user to run the test as')
parser.add_argument('--timeout', action='store', help='Timeout for the test, in seconds')
parser.add_argument('--ssh_identity', action='store', help='The path to the SSH identity file to use')
parser = subparsers.add_parser('run', help='Run a suite of tests across a set of hosts')
parser.add_argument('hostlist', action='store', help='Path to a file of newline-separated `username@host` entries')
parser.add_argument('testlist', action='store', help='Path to a file of newline-separated test package names')
parser.add_argument('--test_name', action='store', help='Name of the test run')
parser.add_argument('--dependencies', action='store', help='Path to a file that encodes the list of packages to install.')
parser.add_argument('--port', action='store', type=int, help='Port to serve test results on')
parser.add_argument('--ssh_identity', action='store', help='The path to the SSH identity file to use')
parser = subparsers.add_parser('bootstrap', help='Bootstrap a remote host for testing')
parser.add_argument('venv_dir', action='store', help='The virtualenv directory on the remote host')
parser.add_argument('tests_root', action='store', help='The directory in which test output will be stored')
parser.add_argument('port', action='store', type=int, help='The port to serve test results on')
parser.add_argument('--username', action='store', help='The username to use to SSH into the remote host')
parser.add_argument('--host', action='store', help='The host to SSH into')
parser.add_argument('--hosts', action='store', help='The path to a file with newline-separated "username@host" lines, listing which hosts to SSH into')
parser.add_argument('--ssh_identity', action='store', help='The path to the SSH identity file to use')
parser.add_argument('--logs_dir', action='store', help='The path to a directory in which to log the bootstrapping')
parser.add_argument('--bootstrap_command', action='store', help='The shell command to use to bootstrap the host')
parser.add_argument('--dependencies', action='store', help='Path to a file that encodes the list of packages to install.')
if len(argv) == 2 and argv[1] in ['-h', '--help', 'help']:
argparser.print_usage()
sys.exit(1)
args, _ = argparser.parse_known_args(argv)
if args.action == 'watchdog':
venv_dir = args.venv_dir
test_package = args.test_module
tests_dir = args.tests_root
test_user = getattr(args, 'test_user', None)
test_timeout = getattr(args, 'timeout', None)
if test_timeout is not None:
test_timeout = int(test_timeout)
else:
test_timeout = 75 * 60 # 75 min
if os.getuid() != 0:
print >> sys.stderr, 'You must be root to start a test'
sys.exit(1)
res = enter_venv(venv_dir)
if not res:
print >> sys.stderr, 'Failed to enter virtualenv {}'.format(venv_dir)
sys.exit(1)
res = main_watchdog(venv_dir, test_package, tests_dir, timeout=test_timeout, test_user=test_user)
sys.stderr.flush()
sys.stdout.flush()
print "-----END TEST OUTPUT-----" # don't change this line! it demarkates the end of the stdout from the test, and the beginning of the test status
sys.stdout.flush()
print json.dumps(res, indent=4, sort_keys=True)
sys.stdout.flush()
sys.exit(0)
elif args.action == 'worker':
venv_dir = args.venv_dir
test_package = args.test_module
test_dir = args.test_dir
test_user = getattr(args, 'test_user', None)
if os.getuid() != 0:
print 'You must be root to start a test worker'
return 1
res = enter_venv(venv_dir)
if not res:
print >> sys.stderr, 'Failed to enter virtualenv {}'.format(venv_dir)
sys.exit(1)
res = main_test(test_package, test_dir, test_user=test_user)
sys.exit(res)
elif args.action == 'httpd':
tests_root = args.tests_root
portnum = args.port
res = main_http_server(tests_root, portnum)
sys.exit(0)
elif args.action == 'httpd-stop':
portnum = args.port
res = main_http_server_stop(portnum)
if res:
sys.exit(0)
else:
sys.exit(1)
elif args.action == 'setup':
venv_dir = args.venv_dir
deps_path = getattr(args, 'dependencies', None)
deps = None
if deps_path is None:
deps = DEFAULT_DEPS
else:
if not os.path.exists(deps_path):
print >> sys.stderr, 'No such file or directory: {}'.format(deps_path)
sys.exit(1)
with open(deps_path, 'r') as f:
data = f.read()
try:
deps = json.loads(data)
except:
print >> sys.stderr, 'Failed to parse {} as JSON'.format(deps_path)
sys.exit(1)
# set up the test-running environment
if in_venv():
print >> sys.stderr, 'You are already in a virtualenv. Please deactivate it and try again'
sys.exit(1)
if not os.path.exists(venv_dir):
res = setup_venv(venv_dir)
if not res:
print >> sys.stderr, 'Failed to setup virtualenv {}'.format(venv_dir)
sys.exit(1)
res = enter_venv(venv_dir)
if not res:
print >> sys.stderr, 'Failed to enter virtualenv {}'.format(venv_dir)
sys.exit(1)
src_dir = os.path.join(venv_dir, 'src')
if not os.path.exists(src_dir):
os.makedirs(src_dir)
# install dependencies
res = install_dependencies(src_dir, deps, venv_dir=venv_dir)
if not res:
print >> sys.stderr, 'Failed to install some dependencies'
sys.exit(1)
elif args.action == 'bootstrap':
venv_dir = args.venv_dir
tests_dir = args.tests_root
port = args.port
host = getattr(args, 'host', None)
username = getattr(args, 'username', None)
hostlist = getattr(args, 'hosts', None)
dependencies = getattr(args, 'dependencies', None)
ssh_id_path = getattr(args, 'ssh_identity', None)
logsdir = getattr(args, 'logs_dir', None)
bootstrap_command = getattr(args, 'bootstrap_command', DEFAULT_BOOTSTRAP_SCRIPT)
if bootstrap_command is None:
bootstrap_command = DEFAULT_BOOTSTRAP_SCRIPT
hostinfo = []
if username is not None and host is not None:
# bootstrap single host
hostinfo = [{'user': username, 'host': host}]
else:
hostinfo = read_host_list(hostlist)
if hostinfo is None:
sys.exit(1)
if logsdir:
if not os.path.exists(logsdir):
os.makedirs(logsdir)
failed = []
for h in hostinfo:
log_path = None
if logsdir:
log_path = os.path.join(logsdir, 'bootstrap-{}@{}.log'.format(h['user'], h['host']))
log.debug("Bootstrapping {}@{}... (logs in {})".format(h['user'], h['host'], log_path))
else:
log.debug("Bootstrapping {}@{}...".format(h['user'], h['host']))
res = bootstrap_host(h['user'], h['host'], venv_dir, tests_dir, port, deps_path=dependencies, ssh_id_path=ssh_id_path, bootstrap_script=bootstrap_command, logfile=log_path)
if not res:
failed.append(h)
if len(failed) > 0:
log.error("Failed to bootstrap the following hosts:")
for f in failed:
log.error(" {}@{}".format(f['user'], f['host']))
sys.exit(1)
else:
sys.exit(0)
elif args.action == 'run-test':
venv_dir = args.venv_dir
test_package = args.test_module
tests_dir = args.tests_root
login = args.login
ssh_id_path = getattr(args, 'ssh_identity', None)
timeout = getattr(args, 'timeout', None)
test_user = getattr(args, 'test_user', None)
if timeout is None:
timeout = 60 * 75
user, host = login.split('@', 1)
res = main_run_test(user, host, venv_dir, test_package, tests_dir, timeout=timeout, test_user=test_user, ssh_id_path=ssh_id_path)
sys.stderr.flush()
sys.stdout.flush()
print "-----END TEST OUTPUT-----" # don't change this line! it demarkates the end of the stdout from the test, and the beginning of the test status
sys.stdout.flush()
print json.dumps(res, indent=4, sort_keys=True)
sys.exit(0)
elif args.action == 'run':
hostlist = args.hostlist
testlist = args.testlist
test_name = getattr(args, 'test_name', None)
ssh_id_path = getattr(args, 'ssh_identity', None)
dependencies = getattr(args, 'dependencies', None)
port = getattr(args, 'port', None)
if test_name is None:
test_name = os.urandom(16).encode('hex')
if port is None:
port = DEFAULT_TEST_RUNNER_PORT
hostinfo = read_host_list(hostlist)
if hostinfo is None:
sys.exit(1)
testinfo = read_test_list(testlist)
if testinfo is None:
sys.exit(1)
res = main_test_runner(test_name, hostinfo, testinfo, ssh_id_path=ssh_id_path, dependencies=dependencies)
else:
print >> sys.stderr, 'Unrecognized directive. Try -h for help'
sys.exit(1)
if __name__ == "__main__":
main(sys.argv[1:])