mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-09 22:37:47 +08:00
953 lines
29 KiB
Python
Executable File
953 lines
29 KiB
Python
Executable File
#!/usr/bin/env python2
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Blockstack-client
|
||
~~~~~
|
||
copyright: (c) 2014-2015 by Halfmoon Labs, Inc.
|
||
copyright: (c) 2016-2017 by Blockstack.org
|
||
|
||
This file is part of Blockstack-client.
|
||
|
||
Blockstack-client is free software: you can redistribute it and/or modify
|
||
it under the terms of the GNU General Public License as published by
|
||
the Free Software Foundation, either version 3 of the License, or
|
||
(at your option) any later version.
|
||
|
||
Blockstack-client is distributed in the hope that it will be useful,
|
||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
GNU General Public License for more details.
|
||
You should have received a copy of the GNU General Public License
|
||
along with Blockstack-client. If not, see <http://www.gnu.org/licenses/>.
|
||
"""
|
||
import os
|
||
import sys
|
||
import atexit
|
||
|
||
if sys.version_info.major != 2:
|
||
raise Exception("Python 3 is not supported")
|
||
|
||
if sys.version_info.minor < 7:
|
||
raise Exception("Python 2.7 or greater is required")
|
||
|
||
import traceback
|
||
import json
|
||
import argparse
|
||
|
||
# Hack around absolute paths
|
||
current_dir = os.path.abspath(os.path.dirname(__file__))
|
||
parent_dir = os.path.abspath(current_dir + "/../")
|
||
|
||
import time
|
||
import thread
|
||
import errno
|
||
import signal
|
||
import socket
|
||
import posixpath
|
||
import urllib
|
||
import urllib2
|
||
import SocketServer
|
||
from SimpleHTTPServer import SimpleHTTPRequestHandler
|
||
import re
|
||
import jsonschema
|
||
from jsonschema import ValidationError
|
||
import requests
|
||
import random
|
||
from ConfigParser import SafeConfigParser
|
||
import keylib
|
||
import shutil
|
||
import blockstack_client
|
||
import blockstack
|
||
from blockstack import virtualchain_hooks
|
||
import virtualchain
|
||
import blockstack_client.schemas as schemas
|
||
|
||
SNAPSHOTS_CONFIG_PATH = os.path.expanduser("~/.blockstack-snapshots/blockstack-snapshots.ini")
|
||
|
||
SNAPSHOT_CHECK_INTERVAL = int(os.environ.get("BLOCKSTACK_SNAPSHOT_CHECK_INTERVAL", 60))
|
||
running = True
|
||
|
||
log = blockstack_client.get_logger("blockstack-snapshots")
|
||
|
||
class BlockstackSnapshotHandler(SimpleHTTPRequestHandler):
|
||
"""
|
||
Snapshot server request handler.
|
||
|
||
Endpoints are:
|
||
GET /v1/ping Is this server alive?
|
||
POST /v1/snapshot Sign a snapshot digest if it is signed by a trusted key
|
||
|
||
server variables:
|
||
self.server.trusted_public_key
|
||
self.server.private_key
|
||
"""
|
||
|
||
JSONRPC_MAX_SIZE = 1024 * 1024
|
||
|
||
def _send_headers(self, status_code=200, content_type='application/json'):
|
||
"""
|
||
Generate and reply headers
|
||
"""
|
||
self.send_response(status_code)
|
||
self.send_header('content-type', content_type)
|
||
self.send_header('Access-Control-Allow-Origin', '*') # CORS
|
||
self.end_headers()
|
||
|
||
|
||
def _reply_json(self, json_payload, status_code=200):
|
||
"""
|
||
Return a JSON-serializable data structure
|
||
"""
|
||
self._send_headers(status_code=status_code)
|
||
json_str = json.dumps(json_payload)
|
||
self.wfile.write(json_str)
|
||
|
||
|
||
def _read_payload(self, maxlen=None):
|
||
"""
|
||
Read raw uploaded data.
|
||
Return the data on success
|
||
Return None on I/O error, or if maxlen is not None and the number of bytes read is too big
|
||
"""
|
||
|
||
client_address_str = "{}:{}".format(self.client_address[0], self.client_address[1])
|
||
|
||
# check length
|
||
read_len = self.headers.get('content-length', None)
|
||
if read_len is None:
|
||
log.error("No content-length given from {}".format(client_address_str))
|
||
return None
|
||
|
||
try:
|
||
read_len = int(read_len)
|
||
except:
|
||
log.error("Invalid content-length")
|
||
return None
|
||
|
||
if maxlen is not None and read_len >= maxlen:
|
||
log.error("Request from {} is too long ({} >= {})".format(client_address_str, read_len, maxlen))
|
||
return None
|
||
|
||
# get the payload
|
||
request_str = self.rfile.read(read_len)
|
||
return request_str
|
||
|
||
|
||
def _read_json(self, schema=None):
|
||
"""
|
||
Read a JSON payload from the requester
|
||
Return the parsed payload on success
|
||
Return None on error
|
||
"""
|
||
# JSON post?
|
||
request_type = self.headers.get('content-type', None)
|
||
client_address_str = "{}:{}".format(self.client_address[0], self.client_address[1])
|
||
|
||
if request_type != 'application/json':
|
||
log.error("Invalid request of type {} from {}".format(request_type, client_address_str))
|
||
return None
|
||
|
||
request_str = self._read_payload(maxlen=self.JSONRPC_MAX_SIZE)
|
||
if request_str is None:
|
||
log.error("Failed to read request")
|
||
return None
|
||
|
||
# parse the payload
|
||
request = None
|
||
try:
|
||
request = json.loads( request_str )
|
||
if schema is not None:
|
||
jsonschema.validate( request, schema )
|
||
|
||
except (TypeError, ValueError, ValidationError) as ve:
|
||
if BLOCKSTACK_DEBUG:
|
||
log.exception(ve)
|
||
|
||
return None
|
||
|
||
return request
|
||
|
||
|
||
def parse_qs(self, qs):
|
||
"""
|
||
Parse query string, but enforce one instance of each variable.
|
||
Return a dict with the variables on success
|
||
Return None on parse error
|
||
"""
|
||
qs_state = urllib2.urlparse.parse_qs(qs)
|
||
ret = {}
|
||
for qs_var, qs_value_list in qs_state.items():
|
||
if len(qs_value_list) > 1:
|
||
return None
|
||
|
||
ret[qs_var] = qs_value_list[0]
|
||
|
||
return ret
|
||
|
||
|
||
def get_path_and_qs(self):
|
||
"""
|
||
Parse and obtain the path and query values.
|
||
We don't care about fragments.
|
||
|
||
Return {'path': ..., 'qs_values': ...} on success
|
||
Return {'error': ...} on error
|
||
"""
|
||
path_parts = self.path.split("?", 1)
|
||
|
||
if len(path_parts) > 1:
|
||
qs = path_parts[1].split("#", 1)[0]
|
||
else:
|
||
qs = ""
|
||
|
||
path = path_parts[0].split("#", 1)[0]
|
||
path = posixpath.normpath(urllib.unquote(path))
|
||
|
||
qs_values = self.parse_qs( qs )
|
||
if qs_values is None:
|
||
return {'error': 'Failed to parse query string'}
|
||
|
||
parts = path.strip('/').split('/')
|
||
|
||
return {'path': path, 'qs_values': qs_values, 'parts': parts}
|
||
|
||
|
||
def sign_snapshot( self ):
|
||
"""
|
||
Read a snapshot digest, sign it, and return the signature.
|
||
Return 200 with {'sigb64': sigb64}DDDD on success
|
||
Return 401 on invalid request structure
|
||
Return 403 on invalid signature
|
||
|
||
TODO: if this server is colocated with a blockstack server, then
|
||
it should verify that independently calculated the same snapshot.
|
||
Use `block_height` for that.
|
||
"""
|
||
snapshot_schema = {
|
||
'type': 'object',
|
||
'properties': {
|
||
'block_height': {
|
||
'type': 'integer',
|
||
},
|
||
'file_hash': {
|
||
'type': 'string',
|
||
'pattern': schemas.OP_HEX_PATTERN,
|
||
},
|
||
'sigb64': {
|
||
'type': 'string',
|
||
'pattern': schemas.OP_BASE64_PATTERN,
|
||
},
|
||
},
|
||
'required': [
|
||
'file_hash',
|
||
'sigb64',
|
||
],
|
||
'additionalProperties': False,
|
||
}
|
||
|
||
request = self._read_json(schema=snapshot_schema)
|
||
if request is None:
|
||
return self._reply_json({'error': 'Invalid snapshot request'}, status_code=401)
|
||
|
||
block_height = request['block_height']
|
||
file_hash = request['file_hash']
|
||
sigb64 = request['sigb64']
|
||
|
||
snapshot_info = get_snapshot_hash(self.server.snapshots_dir, block_height)
|
||
if 'error' in snapshot_info:
|
||
# nope
|
||
log.debug("Failed to get snapshot hash for {}: {}".format(block_height, snapshot_info['error']))
|
||
self._reply_json({'error': 'Failed to query local snapshot hash'}, status_code=404)
|
||
|
||
# did any of our trusted public keys sign it?
|
||
valid = False
|
||
for trusted_public_key in self.server.trusted_public_keys:
|
||
valid = blockstack_client.verify_digest( file_hash, trusted_public_key, sigb64 )
|
||
if valid:
|
||
break
|
||
|
||
if not valid:
|
||
return self._reply_json({'error': 'Invalid signature'}, status_code=403)
|
||
|
||
# can sign
|
||
sigb64 = blockstack_client.sign_digest( file_hash, self.server.private_key )
|
||
assert sigb64, "Failed to sign digest"
|
||
|
||
return self._reply_json({'sigb64': sigb64})
|
||
|
||
|
||
def do_POST(self):
|
||
"""
|
||
Handle POST request
|
||
"""
|
||
path_info = self.get_path_and_qs()
|
||
if 'error' in path_info:
|
||
return self._reply_json({'error': 'Invalid request'}, status_code=401)
|
||
|
||
path = path_info['path']
|
||
if path == '/v1/snapshots':
|
||
return self.sign_snapshot()
|
||
else:
|
||
return self._reply_json({'error': 'No such method'}, status_code=404)
|
||
|
||
|
||
def do_GET(self):
|
||
"""
|
||
Handle GET request
|
||
"""
|
||
path_info = self.get_path_and_qs()
|
||
if 'error' in path_info:
|
||
return self._reply_json({'error': 'Invalid request'}, status_code=401)
|
||
|
||
path = path_info['path']
|
||
if path == '/v1/ping':
|
||
return self._reply_json({'status': 'alive'})
|
||
else:
|
||
return self._reply_json({'error': 'No such method'}, status_code=404)
|
||
|
||
|
||
class BlockstackSnapshotServer(SocketServer.TCPServer):
|
||
"""
|
||
Snapshot server implementation
|
||
"""
|
||
def __init__(self, port, trusted_public_keys, private_key, snapshots_dir ):
|
||
"""
|
||
Set up a snapshot server
|
||
"""
|
||
SocketServer.TCPServer.__init__(self, ('0.0.0.0', port), BlockstackSnapshotHandler, bind_and_activate=False)
|
||
|
||
self.trusted_public_keys = [keylib.ECPublicKey(pk).to_hex() for pk in trusted_public_keys]
|
||
self.private_key = keylib.ECPrivateKey(private_key).to_hex()
|
||
self.snapshots_dir = snapshots_dir
|
||
|
||
log.debug("Set SO_REUSADDR")
|
||
self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
|
||
|
||
self.server_bind()
|
||
self.server_activate()
|
||
|
||
|
||
def make_snapshot(snapshots_dir, private_key, block_number):
|
||
"""
|
||
Make a snapshot from the given block height at {snapshots_dir}/snapshot.bsk.{block_number}
|
||
Symlink it to {snapshots_dir}/snapshot.bsk
|
||
|
||
Return {'status': True} on success
|
||
Return {'error': ...} on failure
|
||
"""
|
||
|
||
tmp_snapshot_path = os.path.join(snapshots_dir, '.snapshot.bsk.{}'.format(block_number))
|
||
log.debug("Make snapshot for {} in {}".format(block_number, tmp_snapshot_path))
|
||
|
||
try:
|
||
res = blockstack.fast_sync_snapshot( tmp_snapshot_path, private_key, block_number )
|
||
assert res
|
||
except Exception as e:
|
||
log.exception(e)
|
||
try:
|
||
os.unlink(tmp_snapshot_path)
|
||
except:
|
||
pass
|
||
|
||
return {'error': 'Failed to create snapshot'}
|
||
|
||
# move into position
|
||
snapshot_path = os.path.join(snapshots_dir, 'snapshot.bsk.{}'.format(block_number))
|
||
shutil.move( tmp_snapshot_path, snapshot_path )
|
||
|
||
# symlink 'snapshot.bsk' to it
|
||
old_dir = os.getcwd()
|
||
try:
|
||
os.chdir(snapshots_dir)
|
||
if os.path.exists('snapshot.bsk'):
|
||
os.unlink('snapshot.bsk')
|
||
|
||
# symlink it in
|
||
os.symlink('snapshot.bsk.{}'.format(block_number), 'snapshot.bsk')
|
||
except Exception as e:
|
||
log.exception(e)
|
||
return {'error': 'Failed to symlink {} to {}'.format('snapshot.bsk.{}'.format(block_number), 'snapshot.bsk')}
|
||
finally:
|
||
os.chdir(old_dir)
|
||
|
||
log.debug("Snapshot at {}".format(snapshot_path))
|
||
|
||
return {'status': True}
|
||
|
||
|
||
def get_snapshot_hash(snapshots_dir, block_number):
|
||
"""
|
||
Get the expected hash of a snapshot payload
|
||
Return {'status': True, 'hash': hex hash} on success
|
||
Return {'error': ...} on failure
|
||
"""
|
||
|
||
snapshot_path = os.path.join(snapshots_dir, 'snapshot.bsk.{}'.format(block_number))
|
||
if not os.path.exists(snapshot_path):
|
||
return {'error': 'No such snapshot'}
|
||
|
||
snapshot_info = blockstack.fast_sync_inspect_snapshot(snapshot_path)
|
||
if 'error' in snapshot_info:
|
||
return {'error': 'Failed to query snapshot'}
|
||
|
||
snapshot_hash = snapshot_info['hash']
|
||
return {'status': True, 'hash': snapshot_hash}
|
||
|
||
|
||
def snapshot_exists(snapshots_dir, block_number):
|
||
"""
|
||
Do we have a snapshot for this block height?
|
||
"""
|
||
snapshot_path = os.path.join( snapshots_dir, "snapshot.bsk.{}".format(block_number) )
|
||
return os.path.exists(snapshot_path)
|
||
|
||
|
||
def find_backup_blocks(working_dir):
|
||
"""
|
||
Find the set of block numbers for which we have backups
|
||
"""
|
||
backup_blocks = virtualchain.indexer.StateEngine.get_backup_blocks( virtualchain_hooks, working_dir=working_dir )
|
||
return list(set(backup_blocks))
|
||
|
||
|
||
def find_snapshot_blocks(snapshots_dir):
|
||
"""
|
||
Find the set of block numbers for which we have snapshots
|
||
"""
|
||
snapshot_blocks = []
|
||
names = os.listdir(snapshots_dir)
|
||
for name in names:
|
||
if re.match("^snapshot.bsk.[0-9]+$", name):
|
||
# what's the block?
|
||
_, block_num_str = name.rsplit('.', 1)
|
||
block_num = int(block_num_str)
|
||
snapshot_blocks.append( block_num )
|
||
|
||
return list(set(snapshot_blocks))
|
||
|
||
|
||
def snapshotter_thread_main(working_dir, snapshots_dir, private_key, check_running):
|
||
"""
|
||
Continuously watch the snapshots directory.
|
||
Periodically check that we're not running with
|
||
check_running callable. Meant to be run in a thread.
|
||
"""
|
||
|
||
def _wait(timeout):
|
||
deadline = time.time() + timeout
|
||
while time.time() < deadline:
|
||
if not check_running():
|
||
return False
|
||
|
||
time.sleep(0.5)
|
||
|
||
return True
|
||
|
||
if not os.path.exists(snapshots_dir):
|
||
os.makedirs(snapshots_dir)
|
||
|
||
block_numbers = find_snapshot_blocks(snapshots_dir)
|
||
|
||
log.debug("Snapshotter start")
|
||
while check_running():
|
||
# find out which block numbers are represented in the backups directory
|
||
|
||
cur_block_numbers = find_backup_blocks(working_dir)
|
||
|
||
log.debug("snapshot blocks: {}".format(",".join(['{}'.format(b) for b in block_numbers])))
|
||
log.debug("backup blocks: {}".format(",".join(['{}'.format(b) for b in cur_block_numbers])))
|
||
|
||
new_block_numbers = []
|
||
for block_number in cur_block_numbers:
|
||
if block_number not in block_numbers:
|
||
new_block_numbers.append(block_number)
|
||
|
||
if len(new_block_numbers) == 0:
|
||
# no new blocks
|
||
res = _wait(SNAPSHOT_CHECK_INTERVAL)
|
||
if not res:
|
||
# died
|
||
break
|
||
|
||
continue
|
||
|
||
# have new snapshots to make.
|
||
# start with the most recent
|
||
for new_block in reversed(sorted(new_block_numbers)):
|
||
res = make_snapshot( snapshots_dir, private_key, new_block )
|
||
if 'error' in res:
|
||
log.error("Failed to make snapshot for {}: {}".format(new_block, res['error']))
|
||
|
||
else:
|
||
block_numbers.append(new_block)
|
||
|
||
log.debug("Snapshotter exit")
|
||
return
|
||
|
||
|
||
def read_config(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Read our config file
|
||
Return the config dict on success
|
||
Return {'error': ...} on failure
|
||
"""
|
||
|
||
parser = SafeConfigParser()
|
||
parser.read( config_path )
|
||
|
||
defaults = {
|
||
'blockstack-snapshots': {
|
||
'private_key': os.path.join( os.path.dirname(config_path), 'public_keys' ),
|
||
'public_keys': os.path.join( os.path.dirname(config_path), 'private_key' ),
|
||
'snapshots_dir': os.path.join( os.path.dirname(config_path), 'snapshots' ),
|
||
'log_file': os.path.join( os.path.dirname(config_path), 'blockstack-snapshots.log' ),
|
||
'port': 31128,
|
||
},
|
||
}
|
||
|
||
private_key_path = None
|
||
public_keys_path = None
|
||
snapshots_dir = None
|
||
log_file = None
|
||
port = None
|
||
|
||
if parser.has_section("blockstack-snapshots"):
|
||
if parser.has_option('blockstack-snapshots', 'private_key'):
|
||
private_key_path = parser.get('blockstack-snapshots', 'private_key')
|
||
|
||
if parser.has_option('blockstack-snapshots', 'public_keys'):
|
||
public_keys_path = parser.get('blockstack-snapshots', 'public_keys')
|
||
|
||
if parser.has_option('blockstack-snapshots', 'snapshots_dir'):
|
||
snapshots_dir = parser.get('blockstack-snapshots', 'snapshots_dir')
|
||
|
||
if parser.has_option('blockstack-snapshots', 'log_file'):
|
||
log_file = parser.get('blockstack-snapshots', 'log_file')
|
||
|
||
if parser.has_option('blockstack-snapshots', 'port'):
|
||
try:
|
||
port = int(parser.get('blockstack-snapshots', 'port'))
|
||
except:
|
||
print >> sys.stderr, "Failed to parse `port` in config file"
|
||
sys.exit(1)
|
||
|
||
ret = {
|
||
'blockstack-snapshots': {
|
||
'private_key': private_key_path,
|
||
'public_keys': public_keys_path,
|
||
'snapshots_dir': snapshots_dir,
|
||
'log_file': log_file,
|
||
'port': port
|
||
},
|
||
}
|
||
|
||
for sec in ret.keys():
|
||
for field in ret[sec].keys():
|
||
if ret[sec][field] is None:
|
||
if defaults[sec][field] is not None:
|
||
ret[sec][field] = defaults[sec][field]
|
||
else:
|
||
del ret[sec][field]
|
||
|
||
return ret
|
||
|
||
|
||
def merge_config_args(conf, args):
|
||
"""
|
||
Merge CLI arguments into our config dict
|
||
"""
|
||
fields = {
|
||
'private_key': 'blockstack-snapshots.private_key',
|
||
'snapshots': 'blockstack-snapshots.snapshots_dir',
|
||
'public_keys': 'blockstack-snapshots.public_keys',
|
||
'log_file': 'blockstack-snapshots.log_file',
|
||
'port': 'blockstack-snapshots.port'
|
||
}
|
||
|
||
required = fields.keys()
|
||
|
||
for attr in fields.keys():
|
||
attrval = getattr(args, attr, None)
|
||
if attrval is None:
|
||
continue
|
||
|
||
parts = fields[attr].split('.')
|
||
conf_sec = parts[0]
|
||
conf_attr = parts[1]
|
||
|
||
conf[conf_sec][conf_attr] = attrval
|
||
|
||
return conf
|
||
|
||
|
||
def server_wait(snapshots_conf):
|
||
"""
|
||
Wait for the server to come up
|
||
"""
|
||
delay = 1.0
|
||
up = False
|
||
for i in xrange(0, 5):
|
||
try:
|
||
url = 'http://localhost:{}/v1/ping'.format(snapshots_conf['port'])
|
||
req = requests.get(url)
|
||
assert req.status_code == 200
|
||
|
||
up = True
|
||
break
|
||
|
||
except Exception, e:
|
||
log.exception(e)
|
||
log.debug("Failed to ping {}. Try again in {}".format(url, delay))
|
||
time.sleep(delay)
|
||
delay = delay * 2 + random.random() * delay
|
||
|
||
return up
|
||
|
||
|
||
def is_running():
|
||
global running
|
||
return running
|
||
|
||
|
||
def get_pid_file_path(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Get the PID file path
|
||
"""
|
||
pid_path = os.path.join( os.path.dirname(config_path), 'blockstack-snapshots.pid' )
|
||
return pid_path
|
||
|
||
|
||
def put_pid_file(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Put the PID file
|
||
Return True on success
|
||
Return False if it already exists
|
||
"""
|
||
pid_path = get_pid_file_path(config_path=config_path)
|
||
if os.path.exists(pid_path):
|
||
log.debug("PID path already exists: {}".format(pid_path))
|
||
return False
|
||
|
||
with open(pid_path, "w") as f:
|
||
f.write( str(os.getpid()) )
|
||
|
||
return True
|
||
|
||
|
||
def get_pid_from_file(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Get the PID in the PID file
|
||
Return None if there is no such file
|
||
"""
|
||
pid_path = get_pid_file_path(config_path=config_path)
|
||
if not os.path.exists(pid_path):
|
||
return None
|
||
|
||
with open(pid_path, 'r') as f:
|
||
pid = int(f.read().strip())
|
||
|
||
return pid
|
||
|
||
|
||
def remove_pid_file(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Remove the PID file
|
||
Idempotent; succeeds even if it isn't there
|
||
"""
|
||
pid_path = get_pid_file_path(config_path=config_path)
|
||
if os.path.exists(pid_path):
|
||
os.unlink(pid_path)
|
||
|
||
return True
|
||
|
||
|
||
def atexit_cleanup(config_path=SNAPSHOTS_CONFIG_PATH):
|
||
"""
|
||
Clean up at exit
|
||
"""
|
||
log.debug("Atexit called for {}".format(os.getpid()))
|
||
remove_pid_file(config_path)
|
||
|
||
|
||
def signal_exit(ignored_1, ignored_2):
|
||
"""
|
||
Signal handler cleanup
|
||
"""
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
|
||
virtualchain.setup_virtualchain( virtualchain_hooks )
|
||
working_dir = virtualchain.get_working_dir( impl=virtualchain_hooks )
|
||
config_path = SNAPSHOTS_CONFIG_PATH
|
||
|
||
argparser = argparse.ArgumentParser(description="blockstack-snapshots version {}".format(blockstack.VERSION))
|
||
subparsers = argparser.add_subparsers(
|
||
dest='action', help='the action to be taken')
|
||
|
||
# ---------------------------
|
||
parser = subparsers.add_parser(
|
||
'start',
|
||
help='start the snapshot daemon')
|
||
|
||
parser.add_argument(
|
||
'--debug', action='store_true',
|
||
help='activate debug output')
|
||
|
||
parser.add_argument(
|
||
'--config_file', action='store',
|
||
help='path to the config file (Default is {})'.format(config_path))
|
||
|
||
parser.add_argument(
|
||
'--snapshots', action='store',
|
||
help='the path to the snapshots output directory')
|
||
|
||
parser.add_argument(
|
||
'--private_key', action='store',
|
||
help='the path to the private key to sign snapshots')
|
||
|
||
parser.add_argument(
|
||
'--public_keys', action='store',
|
||
help='the path to a file with the list of public keys to verify snapshots')
|
||
|
||
parser.add_argument(
|
||
'--log_file', action='store',
|
||
help='path to the log file')
|
||
|
||
parser.add_argument(
|
||
'--port', action='store', type=int,
|
||
help='the port to listen on')
|
||
|
||
parser.add_argument(
|
||
'--foreground', action='store_true',
|
||
help='Run in the foreground. Do not daemonize')
|
||
|
||
# ---------------------------
|
||
parser = subparsers.add_parser(
|
||
'stop',
|
||
help='stop the snapshot daemon')
|
||
|
||
parser.add_argument(
|
||
'--config_file', action='store',
|
||
help='path to the config file (Default is {})'.format(config_path))
|
||
|
||
parser.add_argument(
|
||
'--debug', action='store_true',
|
||
help='activate debug output')
|
||
|
||
# ---------------------------
|
||
parser = subparsers.add_parser(
|
||
'snapshot',
|
||
help='make a snapshot for each backup')
|
||
|
||
parser.add_argument(
|
||
'--config_file', action='store',
|
||
help='path to the config file (Default is {})'.format(config_path))
|
||
|
||
parser.add_argument(
|
||
'--snapshots', action='store',
|
||
help='the path to the snapshots output directory')
|
||
|
||
parser.add_argument(
|
||
'--private_key', action='store',
|
||
help='the path to the private key to sign snapshots')
|
||
|
||
parser.add_argument(
|
||
'--debug', action='store_true',
|
||
help='activate debug output')
|
||
|
||
args, _ = argparser.parse_known_args()
|
||
|
||
if hasattr(args, 'debug') and args.debug:
|
||
# re-exec without --debug, but set debug
|
||
os.environ['BLOCKSTACK_DEBUG'] = "1"
|
||
new_argv = []
|
||
for a in sys.argv:
|
||
if a != '--debug':
|
||
new_argv.append(a)
|
||
|
||
os.execv(sys.argv[0], new_argv)
|
||
|
||
config_path = getattr(args, 'config_path', config_path)
|
||
|
||
conf = read_config(config_path=config_path)
|
||
conf = merge_config_args(conf, args)
|
||
|
||
snapshots_conf = conf['blockstack-snapshots']
|
||
|
||
if args.action == 'start':
|
||
# start up
|
||
snapshots_path = snapshots_conf.get('snapshots_dir', None)
|
||
private_key_path = snapshots_conf.get('private_key', None)
|
||
public_keys_path = snapshots_conf.get('public_keys', None)
|
||
log_file = snapshots_conf.get('log_file', None)
|
||
port = snapshots_conf.get('port', None)
|
||
|
||
if snapshots_path is None or private_key_path is None or public_keys_path is None or log_file is None or port is None:
|
||
argparser.print_help()
|
||
print >> sys.stderr, "\nMissing configuration file information or arguments:\n {}\n".format(
|
||
', '.join( filter( lambda x: snapshots_conf.get(x, None) is None, ['snapshots_dir', 'private_key', 'public_keys', 'log_file', 'port'] ) )
|
||
)
|
||
|
||
sys.exit(1)
|
||
|
||
private_key = None
|
||
trusted_public_keys = []
|
||
|
||
if not os.path.exists(snapshots_path) or not os.path.isdir(snapshots_path):
|
||
print >> sys.stderr, "{} does not exist or is not a directory".format(snapshots_path)
|
||
sys.exit(1)
|
||
|
||
if not os.path.exists(private_key_path):
|
||
print >> sys.stderr, 'Failed to read {}: no such file or directory'.format(private_key_path)
|
||
sys.exit(1)
|
||
|
||
if not os.path.exists(public_keys_path):
|
||
print >> sys.stderr, 'Failed to read {}: no such file or directory'.format(public_keys_path)
|
||
sys.exit(1)
|
||
|
||
with open(private_key_path) as f:
|
||
private_key = f.read().strip()
|
||
|
||
with open(public_keys_path) as f:
|
||
trusted_public_key_list = f.read().strip()
|
||
trusted_public_keys = trusted_public_key_list.split()
|
||
|
||
try:
|
||
private_key = keylib.ECPrivateKey(private_key).to_hex()
|
||
|
||
pks = []
|
||
for pk_str in trusted_public_keys:
|
||
pk = keylib.ECPublicKey(pk_str).to_hex()
|
||
pks.append(pk)
|
||
|
||
trusted_public_keys = pks
|
||
|
||
except:
|
||
print >> sys.stderr, "Invalid key data"
|
||
sys.exit(1)
|
||
|
||
if not args.foreground:
|
||
# is another daemon running?
|
||
existing_pid = get_pid_from_file(config_path=config_path)
|
||
if existing_pid is not None:
|
||
# is it actually running?
|
||
try:
|
||
os.kill(existing_pid, 0)
|
||
print >> sys.stderr, "Another daemon is already running ({})".format(existing_pid)
|
||
sys.exit(1)
|
||
|
||
except OSError as oe:
|
||
if oe.errno == errno.ESRCH:
|
||
print >> sys.stderr, "Removing stale PID file"
|
||
remove_pid_file(config_path=config_path)
|
||
|
||
else:
|
||
log.exception(oe)
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
log.exception(e)
|
||
sys.exit(1)
|
||
|
||
# daemonize
|
||
res = blockstack_client.daemonize( log_file, child_wait=lambda: server_wait(snapshots_conf) )
|
||
if res < 0:
|
||
print >> sys.stderr, "Failed to start server"
|
||
sys.exit(1)
|
||
|
||
if res > 0:
|
||
log.debug("Parent {} forked intermediate child {}".format(os.getpid(), res))
|
||
sys.exit(0)
|
||
|
||
# daemon child
|
||
# put PID file
|
||
res = put_pid_file(config_path=config_path)
|
||
if not res:
|
||
print >> sys.stderr, "Failed to write PID file"
|
||
sys.exit(1)
|
||
|
||
|
||
atexit.register( atexit_cleanup, config_path )
|
||
signal.signal(signal.SIGINT, signal_exit )
|
||
signal.signal(signal.SIGQUIT, signal_exit )
|
||
signal.signal(signal.SIGTERM, signal_exit )
|
||
|
||
# daemon child continues here
|
||
# start snapshotter thread
|
||
thr = thread.start_new_thread( snapshotter_thread_main, (working_dir, snapshots_path, private_key, is_running) )
|
||
|
||
# start HTTP server
|
||
srv = BlockstackSnapshotServer( port, trusted_public_keys, private_key, snapshots_path )
|
||
|
||
try:
|
||
srv.serve_forever()
|
||
except Exception as e:
|
||
log.exception(e)
|
||
running = False
|
||
|
||
thr.join()
|
||
sys.exit(0)
|
||
|
||
elif args.action == 'stop':
|
||
# stop running snapshot server
|
||
existing_pid = get_pid_from_file(config_path=config_path)
|
||
if existing_pid is not None:
|
||
# is it running?
|
||
try:
|
||
log.debug("Send SIGTERM to {}".format(existing_pid))
|
||
os.kill(existing_pid, signal.SIGTERM)
|
||
except OSError as oe:
|
||
if oe.errno == errno.ESRCH:
|
||
print >> sys.stderr, "No such process"
|
||
remove_pid_file(config_path=config_path)
|
||
sys.exit(0)
|
||
|
||
else:
|
||
log.exception(oe)
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
log.exception(e)
|
||
sys.exit(1)
|
||
|
||
elif args.action == 'snapshot':
|
||
# make all snapshots
|
||
snapshots_path = snapshots_conf.get('snapshots_dir', None)
|
||
private_key_path = snapshots_conf.get('private_key', None)
|
||
|
||
if snapshots_path is None or private_key_path is None:
|
||
argparser.print_help()
|
||
print >> sys.stderr, "\nMissing configuration file information or arguments\n"
|
||
sys.exit(1)
|
||
|
||
if not os.path.exists(snapshots_path) or not os.path.isdir(snapshots_path):
|
||
print >> sys.stderr, "{} does not exist or is not a directory"
|
||
sys.exit(1)
|
||
|
||
if not os.path.exists(private_key_path):
|
||
print >> sys.stderr, 'Failed to read {}: no such file or directory'.format(private_key_path)
|
||
sys.exit(1)
|
||
|
||
def _snapshot_running():
|
||
global running
|
||
if running:
|
||
running = False
|
||
return True
|
||
|
||
else:
|
||
return running
|
||
|
||
with open(private_key_path, "r") as f:
|
||
private_key = f.read().strip()
|
||
|
||
try:
|
||
private_key = keylib.ECPrivateKey(private_key).to_hex()
|
||
except Exception as e:
|
||
print >> sys.stderr, "Invalid key data"
|
||
sys.exit(1)
|
||
|
||
snapshotter_thread_main(working_dir, snapshots_path, private_key, _snapshot_running)
|
||
sys.exit(0)
|