diff --git a/blockstack/blockstackd.py b/blockstack/blockstackd.py index dbd8bf8ee..2d9c4bd5b 100644 --- a/blockstack/blockstackd.py +++ b/blockstack/blockstackd.py @@ -54,11 +54,12 @@ from virtualchain.lib.hashing import * log = virtualchain.get_logger("blockstack-core") -import blockstack_client - -from lib import nameset as blockstack_state_engine from lib import get_db_state -from lib.config import REINDEX_FREQUENCY +from lib.client import BlockstackRPCClient +from lib.client import ping as blockstack_ping +from lib.client import OP_HEX_PATTERN, OP_CONSENSUS_HASH_PATTERN, OP_ADDRESS_PATTERN, OP_BASE64_EMPTY_PATTERN +from lib.config import REINDEX_FREQUENCY, BLOCKSTACK_TEST, default_bitcoind_opts +from lib.util import url_to_host_port, atlas_inventory_to_string from lib import * from lib.storage import * from lib.atlas import * @@ -73,10 +74,6 @@ bitcoind = None rpc_server = None storage_pusher = None gc_thread = None -has_indexer = True - -from blockstack_client.utils import url_to_host_port, atlas_inventory_to_string -from blockstack_client import queue_findone, queue_findall, queue_removeall, queue_append GC_EVENT_THRESHOLD = 15 @@ -121,12 +118,11 @@ def get_bitcoind( new_bitcoind_opts=None, reset=False, new=False ): return None -def get_pidfile_path(): +def get_pidfile_path(working_dir): """ Get the PID file path. """ - working_dir = virtualchain.get_working_dir() - pid_filename = blockstack_state_engine.get_virtual_chain_name() + ".pid" + pid_filename = virtualchain_hooks.get_virtual_chain_name() + ".pid" return os.path.join( working_dir, pid_filename ) @@ -141,34 +137,15 @@ def put_pidfile( pidfile_path, pid ): return -def get_logfile_path(): +def get_logfile_path(working_dir): """ Get the logfile path for our service endpoint. """ - working_dir = virtualchain.get_working_dir() - logfile_filename = blockstack_state_engine.get_virtual_chain_name() + ".log" + logfile_filename = virtualchain_hooks.get_virtual_chain_name() + ".log" return os.path.join( working_dir, logfile_filename ) -def get_lastblock(): - """ - Get the last block processed. - """ - lastblock_filename = virtualchain.get_lastblock_filename() - if not os.path.exists( lastblock_filename ): - return None - - try: - with open(lastblock_filename, "r") as f: - lastblock_txt = f.read() - - lastblock = int(lastblock_txt.strip()) - return lastblock - except: - return None - - -def get_index_range(): +def get_index_range(working_dir): """ Get the bitcoin block index range. Mask connection failures with timeouts. @@ -187,7 +164,7 @@ def get_index_range(): wait = 1.0 while last_block is None and is_running(): - first_block, last_block = virtualchain.get_index_range( bitcoind_session ) + first_block, last_block = virtualchain.get_index_range('bitcoin', bitcoind_session, virtualchain_hooks, working_dir) if last_block is None: @@ -203,14 +180,6 @@ def get_index_range(): return first_block, last_block - NUM_CONFIRMATIONS -def is_indexer(): - """ - Is this node indexing? - """ - global has_indexer - return has_indexer - - def rpc_traceback(): exception_data = traceback.format_exc().splitlines() return { @@ -394,17 +363,20 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler): else: if os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1": - log.debug("Inbound RPC begin %s(%s)" % ("rpc_" + str(method), params)) + log.debug("Inbound RPC begin %s(%s) from %s" % ("rpc_" + str(method), params, self.client_address[0])) else: - log.debug("RPC %s(%s)" % ("rpc_" + str(method), params)) + log.debug("RPC %s(%s) from %s" % ("rpc_" + str(method), params, self.client_address[0])) res = self.server.funcs["rpc_" + str(method)](*params, **con_info) + if 'deprecated' in res and res['deprecated']: + log.warn("DEPRECATED method call {} from {}".format(method, self.client_address[0])) + # lol jsonrpc within xmlrpc ret = json.dumps(res) if os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1": - log.debug("Inbound RPC end %s(%s)" % ("rpc_" + str(method), params)) + log.debug("Inbound RPC end %s(%s) from %s" % ("rpc_" + str(method), params, self.client_address[0])) return ret except Exception, e: @@ -421,9 +393,12 @@ class BlockstackdRPC( SimpleXMLRPCServer): as RPC methods. """ - def __init__(self, host='0.0.0.0', port=config.RPC_SERVER_PORT, handler=BlockstackdRPCHandler ): + def __init__(self, working_dir, host='0.0.0.0', port=config.RPC_SERVER_PORT, handler=BlockstackdRPCHandler ): + log.info("Serving database state from {}".format(working_dir)) log.info("Listening on %s:%s" % (host, port)) SimpleXMLRPCServer.__init__( self, (host, port), handler, allow_none=True ) + + self.working_dir = working_dir # register methods for attr in dir(self): @@ -433,18 +408,19 @@ class BlockstackdRPC( SimpleXMLRPCServer): self.register_function( method ) - def success_response(self, method_resp ): + def success_response(self, method_resp, **kw): """ Make a standard "success" response, which contains some ancilliary data. """ resp = { 'status': True, - 'indexing': config.is_indexing(), - 'lastblock': config.fast_getlastblock(), + 'indexing': config.is_indexing(self.working_dir), + 'lastblock': virtualchain_hooks.get_last_block(self.working_dir), } - resp.update( method_resp ) + resp.update(kw) + resp.update(method_resp) return resp @@ -481,7 +457,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): if type(block_id) not in [int, long]: return False - if os.environ.get("BLOCKSTACK_TEST") == "1": + if BLOCKSTACK_TEST: if block_id <= 0: return False @@ -552,7 +528,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): """ verify that a string is an address """ - return self.check_string(address, min_length=26, max_length=35, pattern=blockstack_client.schemas.OP_ADDRESS_PATTERN) + return self.check_string(address, min_length=26, max_length=35, pattern=OP_ADDRESS_PATTERN) def rpc_ping(self, **con_info): @@ -568,13 +544,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_name(name): return {'error': 'invalid name'} - db = get_db_state() + db = get_db_state(self.working_dir) try: name = str(name) @@ -590,6 +563,9 @@ class BlockstackdRPC( SimpleXMLRPCServer): else: + assert 'opcode' in name_record, 'BUG: missing opcode' + name_record = op_canonicalize(name_record['opcode'], name_record) + namespace_id = get_namespace_from_name(name) namespace_record = db.get_namespace(namespace_id) if namespace_record is None: @@ -625,13 +601,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'history_blocks': [...]} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_name(name): return {'error': 'invalid name'} - db = get_db_state() + db = get_db_state(self.working_dir) history_blocks = db.get_name_history_blocks( name ) db.close() return self.success_response( {'history_blocks': history_blocks} ) @@ -640,18 +613,16 @@ class BlockstackdRPC( SimpleXMLRPCServer): def rpc_get_name_at( self, name, block_height, **con_info ): """ Get all the states the name was in at a particular block height. + Does NOT work on expired names. Return {'status': true, 'record': ...} """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_name(name): return {'error': 'invalid name'} if not self.check_block(block_height): - return {'status': True, 'record': None} + return self.success_response({'record': None}) - db = get_db_state() + db = get_db_state(self.working_dir) name_at = db.get_name_at( name, block_height ) db.close() @@ -661,126 +632,73 @@ class BlockstackdRPC( SimpleXMLRPCServer): def rpc_get_historic_name_at( self, name, block_height, **con_info ): """ Get all the states the name was in at a particular block height. + Works on expired and unexpired names. Return {'status': true, 'record': ...} """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_name(name): return {'error': 'invalid name'} if not self.check_block(block_height): - return {'status': True, 'record': None} + return self.success_response({'record': None}) - db = get_db_state() + db = get_db_state(self.working_dir) name_at = db.get_name_at( name, block_height, include_expired=True ) db.close() return self.success_response( {'records': name_at} ) - - def rpc_get_op_history_rows( self, history_id, offset, count, **con_info ): + + def rpc_get_num_nameops_at(self, block_id, **con_info): """ - Get a page of history rows for a name or namespace - Return {'status': True, 'history_rows': [history rows]} on success - Return {'error': ...} on error - """ - if not is_indexer(): - return {'error': 'Method not supported'} - - if not self.check_name(history_id) and not self.check_namespace(history_id): - return {'error': 'Invalid name or namespace'} - - if not self.check_offset(offset): - return {'error': 'invalid offset'} - - if not self.check_count(count, 10): - return {'error': 'invalid count'} - - db = get_db_state() - history_rows = db.get_op_history_rows( history_id, offset, count ) - db.close() - - return self.success_response( {'history_rows': history_rows} ) - - - def rpc_get_num_op_history_rows( self, history_id, **con_info ): - """ - Get the total number of history rows - Return {'status': True, 'count': count} on success - Return {'error': ...} on error - """ - if not is_indexer(): - return {'error': 'Method not supported'} - - if not self.check_name(history_id) and not self.check_namespace(history_id): - return {'error': 'Invalid name or namespace'} - - db = get_db_state() - num_history_rows = db.get_num_op_history_rows( history_id ) - db.close() - - return self.success_response( {'count': num_history_rows} ) - - - def rpc_get_nameops_affected_at( self, block_id, offset, count, **con_info ): - """ - Get the sequence of name and namespace records affected at the given block. - The records returned will be in their *current* forms. The caller - should use get_op_history_rows() to fetch the history delta that - can be used to restore the records to their *historic* forms i.e. - at the given block height. - - Returns the list of name operations to be fed into virtualchain, as - {'status': True, 'nameops': [nameops]} - - Returns {'error': ...} on failure - - Used by SNV clients. - """ - if not is_indexer(): - return {'error': 'Method not supported'} - - if not self.check_block(block_id): - return {'error': 'Invalid block height'} - - if not self.check_offset(offset): - return {'error': 'invalid offset'} - - if not self.check_count(count, 10): - return {'error': 'invalid count'} - - # do NOT restore history information, since we're paging - db = get_db_state() - prior_records = db.get_all_ops_at( block_id, offset=offset, count=count, include_history=False, restore_history=False ) - db.close() - log.debug("%s name operations at block %s, offset %s, count %s" % (len(prior_records), block_id, offset, count)) - for rec in prior_records: - if 'buckets' in rec and (isinstance(rec['buckets'], str) or - isinstance(rec['buckets'], unicode)): - rec['buckets'] = json.loads(rec['buckets']) - - return self.success_response( {'nameops': prior_records} ) - - - def rpc_get_num_nameops_affected_at( self, block_id, **con_info ): - """ - Get the number of name and namespace operations at the given block. - Returns {'status': True, 'count': ...} on success + Get the number of Blockstack transactions that occured at the given block. + Returns {'count': ..} on success Returns {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_block(block_id): return {'error': 'Invalid block height'} - db = get_db_state() + db = get_db_state(self.working_dir) count = db.get_num_ops_at( block_id ) db.close() - log.debug("%s name operations at %s" % (count, block_id)) - return self.success_response( {'count': count} ) + log.debug("{} operations at {}".format(count, block_id)) + return self.success_response({'count': count}) + + + def rpc_get_nameops_at(self, block_id, offset, count, **con_info): + """ + Get the name operations that occured in the given block. + + Returns {'nameops': [...]} on success. + Returns {'error': ...} on error + """ + if not self.check_block(block_id): + return {'error': 'Invalid block height'} + + if not self.check_offset(offset): + return {'error': 'Invalid offset'} + + if not self.check_count(count, 10): + return {'error': 'Invalid count'} + + db = get_db_state(self.working_dir) + nameops = db.get_all_ops_at(block_id, offset=offset, count=count) + db.close() + + log.debug("{} name operations at block {}, offset {}, count {}".format(len(nameops), block_id, offset, count)) + ret = [] + + for nameop in nameops: + """ + # TODO: replace with an op-specific 'canonicalize()' method + if 'buckets' in nameop and (isinstance(nameop['buckets'], str) or + isinstance(nameop['buckets'], unicode)): + nameop['buckets'] = json.loads(nameop['buckets']) + """ + assert 'opcode' in nameop, 'BUG: missing opcode' + ret.append(op_canonicalize(nameop['opcode'], nameop)) + + return self.success_response({'nameops': ret}) def rpc_get_nameops_hash_at( self, block_id, **con_info ): @@ -791,13 +709,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Returns {'status': True, 'ops_hash': ops_hash} on success Returns {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_block(block_id): return {'error': 'Invalid block height'} - db = get_db_state() + db = get_db_state(self.working_dir) ops_hash = db.get_block_ops_hash( block_id ) db.close() @@ -814,10 +729,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): * server_alive: True * [optional] zonefile_count: the number of zonefiles known """ - if not is_indexer(): - return {'error': 'Method not supported'} - - bitcoind_opts = blockstack_client.default_bitcoind_opts( virtualchain.get_config_filename(), prefix=True ) + bitcoind_opts = default_bitcoind_opts( virtualchain.get_config_filename(virtualchain_hooks, self.working_dir), prefix=True ) bitcoind = get_bitcoind( new_bitcoind_opts=bitcoind_opts, new=True ) if bitcoind is None: @@ -828,12 +740,12 @@ class BlockstackdRPC( SimpleXMLRPCServer): reply = {} reply['last_block_seen'] = info['blocks'] - db = get_db_state() + db = get_db_state(self.working_dir) reply['consensus'] = db.get_current_consensus() reply['server_version'] = "%s" % VERSION reply['last_block_processed'] = db.get_current_block() reply['server_alive'] = True - reply['indexing'] = config.is_indexing() + reply['indexing'] = config.is_indexing(self.working_dir) db.close() @@ -850,13 +762,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'names': ...} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_address(address): return {'error': 'Invalid address'} - db = get_db_state() + db = get_db_state(self.working_dir) names = db.get_names_owned_by_address( address ) db.close() @@ -872,9 +781,6 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'names': [{'name': ..., 'block_id': ..., 'vtxindex': ...}]} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_address(address): return {'error': 'Invalid address'} @@ -884,7 +790,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): if not self.check_count(count, 10): return {'error': 'invalid count'} - db = get_db_state() + db = get_db_state(self.working_dir) names = db.get_historic_names_by_address(address, offset, count) db.close() @@ -900,14 +806,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'count': ...} on success Return {'error': ...} on failure """ - - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_address(address): return {'error': 'Invalid address'} - db = get_db_state() + db = get_db_state(self.working_dir) ret = db.get_num_historic_names_by_address(address) db.close() @@ -922,14 +824,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return the cost of a given name, including fees Return value is in satoshis (as 'satoshis') """ - - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_name(name): return {'error': 'Invalid name or namespace'} - db = get_db_state() + db = get_db_state(self.working_dir) ret = get_name_cost( db, name ) db.close() @@ -944,14 +842,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return the cost of a given namespace, including fees. Return value is in satoshis """ - - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_namespace(namespace_id): return {'error': 'Invalid name or namespace'} - db = get_db_state() + db = get_db_state(self.working_dir) cost, ns = get_namespace_cost( db, namespace_id ) db.close() @@ -971,14 +865,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'record': ...} on success Return {'error': ...} on error """ - - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_namespace(namespace_id): return {'error': 'Invalid name or namespace'} - db = get_db_state() + db = get_db_state(self.working_dir) ns = db.get_namespace( namespace_id ) if ns is None: # maybe revealed? @@ -988,11 +878,18 @@ class BlockstackdRPC( SimpleXMLRPCServer): if ns is None: return {"error": "No such namespace"} + assert 'opcode' in ns, 'BUG: missing opcode' + ns = op_canonicalize(ns['opcode'], ns) + ns['ready'] = False return self.success_response( {'record': ns} ) else: db.close() + + assert 'opcode' in ns, 'BUG: missing opcode' + ns = op_canonicalize(ns['opcode'], ns) + ns['ready'] = True return self.success_response( {'record': ns} ) @@ -1003,11 +900,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'count': count} on success Return {'error': ...} on error """ - - if not is_indexer(): - return {'error': 'Method not supported'} - - db = get_db_state() + db = get_db_state(self.working_dir) num_names = db.get_num_names() db.close() @@ -1020,11 +913,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'count': count} on success Return {'error': ...} on error """ - - if not is_indexer(): - return {'error': 'Method not supported'} - - db = get_db_state() + db = get_db_state(self.working_dir) num_names = db.get_num_names(include_expired=True) db.close() @@ -1037,16 +926,13 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': true, 'names': [...]} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_offset(offset): return {'error': 'invalid offset'} if not self.check_count(count, 100): return {'error': 'invalid count'} - db = get_db_state() + db = get_db_state(self.working_dir) all_names = db.get_all_names( offset=offset, count=count ) db.close() @@ -1059,16 +945,13 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': true, 'names': [...]} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_offset(offset): return {'error': 'invalid offset'} if not self.check_count(count, 100): return {'error': 'invalid count'} - db = get_db_state() + db = get_db_state(self.working_dir) all_names = db.get_all_names( offset=offset, count=count, include_expired=True ) db.close() @@ -1081,11 +964,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': true, 'namespaces': [...]} on success Return {'error': ...} on error """ - - if not is_indexer(): - return {'error': 'Method not supported'} - - db = get_db_state() + db = get_db_state(self.working_dir) all_namespaces = db.get_all_namespace_ids() db.close() @@ -1098,14 +977,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': true, 'count': count} on success Return {'error': ...} on error """ - - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_namespace(namespace_id): return {'error': 'Invalid name or namespace'} - db = get_db_state() + db = get_db_state(self.working_dir) num_names = db.get_num_names_in_namespace( namespace_id ) db.close() @@ -1118,9 +993,6 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': true, 'names': [...]} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_namespace(namespace_id): return {'error': 'Invalid name or namespace'} @@ -1133,8 +1005,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): if not is_namespace_valid( namespace_id ): return {'error': 'invalid namespace ID'} - - db = get_db_state() + db = get_db_state(self.working_dir) res = db.get_names_in_namespace( namespace_id, offset=offset, count=count ) db.close() @@ -1147,13 +1018,10 @@ class BlockstackdRPC( SimpleXMLRPCServer): Return {'status': True, 'consensus': ...} on success Return {'error': ...} on error """ - if not is_indexer(): - return {'error': 'Method not supported'} - if not self.check_block(block_id): return {'error': 'Invalid block height'} - db = get_db_state() + db = get_db_state(self.working_dir) consensus = db.get_consensus_at( block_id ) db.close() return self.success_response( {'consensus': consensus} ) @@ -1167,9 +1035,6 @@ class BlockstackdRPC( SimpleXMLRPCServer): Returns {'status': True, 'consensus_hashes': dict} on success Returns {'error': ...} on success """ - if not is_indexer(): - return {'error': 'Method not supported'} - if type(block_id_list) != list: return {'error': 'Invalid block heights'} @@ -1180,7 +1045,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): if not self.check_block(bid): return {'error': 'Invalid block height'} - db = get_db_state() + db = get_db_state(self.working_dir) ret = {} for block_id in block_id_list: ret[block_id] = db.get_consensus_at(block_id) @@ -1190,123 +1055,41 @@ class BlockstackdRPC( SimpleXMLRPCServer): return self.success_response( {'consensus_hashes': ret} ) - def rpc_get_mutable_data( self, blockchain_id, fq_data_id, **con_info ): - """ - Get a mutable data record written by a given user. - """ - if type(fq_data_id) not in [str, unicode]: - return {'error': 'Invalid data ID'} - - if len(fq_data_id) > 4096: - # no way this is valid - return {'error': 'Invalid data ID'} - - if not self.check_name(blockchain_id): - return {'error': 'Invalid blockchain ID'} - - conf = get_blockstack_opts() - if not conf['serve_data']: - return {'error': 'No data'} - - drivers = conf.get('data_storage_drivers', None) - if drivers is not None: - drivers = drivers.split(',') - - res = load_mutable_data_from_storage( blockchain_id, fq_data_id, drivers=drivers ) - if res is None: - log.debug("Failed to get {}".format(fq_data_id)) - return {'error': 'Failed to get data'} - - return self.success_response({'data': res}) - - - def rpc_get_immutable_data( self, blockchain_id, data_hash, **con_info ): - """ - Get immutable data record written by a given user. - """ - conf = get_blockstack_opts() - if not conf['serve_data']: - return {'error': 'No data'} - - if not self.check_name(blockchain_id): - return {'error': 'Invalid blockchain ID'} - - if not self.check_string(data_hash, min_length=32, max_length=128, pattern=blockstack_client.schemas.OP_HEX_PATTERN): - return {'error': 'Invalid address'} - - client = get_blockstack_client_session() - return client.get_immutable( str(blockchain_id), str(data_hash) ) - - def rpc_get_block_from_consensus( self, consensus_hash, **con_info ): """ Given the consensus hash, find the block number (or None) """ - if not is_indexer(): - return {'error': 'Method not supported'} - - if not self.check_string(consensus_hash, min_length=LENGTHS['consensus_hash']*2, max_length=LENGTHS['consensus_hash']*2, pattern=blockstack_client.schemas.OP_CONSENSUS_HASH_PATTERN): + if not self.check_string(consensus_hash, min_length=LENGTHS['consensus_hash']*2, max_length=LENGTHS['consensus_hash']*2, pattern=OP_CONSENSUS_HASH_PATTERN): return {'error': 'Not a valid consensus hash'} - db = get_db_state() + db = get_db_state(self.working_dir) block_id = db.get_block_from_consensus( consensus_hash ) db.close() return self.success_response( {'block_id': block_id} ) - def get_zonefile_data( self, config, zonefile_hash, name=None ): + def get_zonefile_data( self, zonefile_hash, zonefile_dir ): """ Get a zonefile by hash Return the serialized zonefile on success Return None on error """ - # check cache - cached_zonefile_data = get_cached_zonefile_data( zonefile_hash, zonefile_dir=config.get('zonefiles', None)) - if cached_zonefile_data is not None: + atlas_zonefile_data = get_atlas_zonefile_data( zonefile_hash, zonefile_dir ) + if atlas_zonefile_data is not None: # check hash - zfh = blockstack_client.get_zonefile_data_hash( cached_zonefile_data ) + zfh = get_zonefile_data_hash( atlas_zonefile_data ) if zfh != zonefile_hash: - log.debug("Invalid cached zonefile %s" % zonefile_hash ) - remove_cached_zonefile_data( zonefile_hash, zonefile_dir=config.get('zonefiles', None)) + log.debug("Invalid local zonefile %s" % zonefile_hash ) + remove_atlas_zonefile_data( zonefile_hash, zonefile_dir ) else: - log.debug("Zonefile %s is cached" % zonefile_hash) - return cached_zonefile_data + log.debug("Zonefile %s is local" % zonefile_hash) + return atlas_zonefile_data return None - def get_zonefile_data_by_name( self, conf, name, name_rec=None ): - """ - Get a zonefile by name - Return the serialized zonefile on success - Return None one error - """ - - if name_rec is None: - if not is_indexer(): - return None - - db = get_db_state() - name_rec = db.get_name( name ) - db.close() - - if name_rec is None: - return None - - zonefile_hash = name_rec.get('value_hash', None) - if zonefile_hash is None: - return None - - # find zonefile - zonefile_data = self.get_zonefile_data( conf, zonefile_hash, name=name ) - if zonefile_data is None: - return None - - return zonefile_data - - def rpc_get_zonefiles( self, zonefile_hashes, **con_info ): """ Get zonefiles from the local cache, @@ -1320,6 +1103,9 @@ class BlockstackdRPC( SimpleXMLRPCServer): conf = get_blockstack_opts() if not conf['serve_zonefiles']: return {'error': 'No data'} + + if 'zonefiles' not in conf: + return {'error': 'No zonefiles directory (likely a configuration bug)'} if type(zonefile_hashes) != list: log.error("Not a zonefile hash list") @@ -1330,12 +1116,12 @@ class BlockstackdRPC( SimpleXMLRPCServer): return {'error': 'Too many requests (no more than 100 allowed)'} for zfh in zonefile_hashes: - if not self.check_string(zfh, min_length=LENGTHS['value_hash']*2, max_length=LENGTHS['value_hash']*2, pattern=blockstack_client.schemas.OP_HEX_PATTERN): + if not self.check_string(zfh, min_length=LENGTHS['value_hash']*2, max_length=LENGTHS['value_hash']*2, pattern=OP_HEX_PATTERN): return {'error': 'Invalid zone file hash'} ret = {} for zonefile_hash in zonefile_hashes: - zonefile_data = self.get_zonefile_data( conf, zonefile_hash ) + zonefile_data = self.get_zonefile_data( zonefile_hash, conf['zonefiles'] ) if zonefile_data is None: continue @@ -1358,9 +1144,9 @@ class BlockstackdRPC( SimpleXMLRPCServer): conf = get_blockstack_opts() if not conf['serve_zonefiles']: return {'error': 'No data'} - - if not is_indexer(): - return {'error': 'Method not supported'} + + if 'zonefiles' not in conf: + return {'error': 'No zonefiles directory (likely a configuration error)'} if type(zonefile_datas) != list: return {'error': 'Invalid data'} @@ -1369,12 +1155,12 @@ class BlockstackdRPC( SimpleXMLRPCServer): return {'error': 'Too many zonefiles'} for zfd in zonefile_datas: - if not self.check_string(zfd, max_length=((4 * RPC_MAX_ZONEFILE_LEN) / 3) + 3, pattern=blockstack_client.schemas.OP_BASE64_PATTERN): + if not self.check_string(zfd, max_length=((4 * RPC_MAX_ZONEFILE_LEN) / 3) + 3, pattern=OP_BASE64_EMPTY_PATTERN): return {'error': 'Invalid zone file payload (exceeds {} bytes)'.format(RPC_MAX_ZONEFILE_LEN)} zonefile_dir = conf.get("zonefiles", None) saved = [] - db = get_db_state() + db = get_db_state(self.working_dir) for zonefile_data in zonefile_datas: @@ -1391,376 +1177,33 @@ class BlockstackdRPC( SimpleXMLRPCServer): saved.append(0) continue - zonefile_hash = blockstack_client.get_zonefile_data_hash( str(zonefile_data) ) + zonefile_hash = get_zonefile_data_hash(str(zonefile_data)) # does it correspond to a valid zonefile? - names_with_hash = db.get_names_with_value_hash( zonefile_hash ) - if names_with_hash is None or len(names_with_hash) == 0: - log.debug("Unknown zonefile hash %s" % zonefile_hash) + zonefile_txids = db.get_value_hash_txids(zonefile_hash) + if len(zonefile_txids) == 0: + # nope + log.debug("Unknown zonefile hash {}".format(zonefile_hash)) saved.append(0) continue - - rc = store_cached_zonefile_data( str(zonefile_data), zonefile_dir=zonefile_dir ) + + # keep this around + rc = store_atlas_zonefile_data( str(zonefile_data), zonefile_dir ) if not rc: - log.error("Failed to cache {}".format(zonefile_hash)) + log.error("Failed to store zone file {}".format(zonefile_hash)) saved.append(0) continue - # maybe a proper zonefile? if so, get the name out - name = None - txid = None - try: - zonefile = blockstack_zones.parse_zone_file( str(zonefile_data) ) - name = str(zonefile['$origin']) - txid = db.get_name_value_hash_txid( name, zonefile_hash ) - except Exception, e: - log.debug("Not a well-formed zonefile: %s" % zonefile_hash) - - # queue for replication - rc = storage_enqueue_zonefile( txid, str(zonefile_hash), str(zonefile_data) ) - if not rc: - log.error("Failed to store zonefile {}".format(zonefile_hash)) - saved.append(0) - continue - - log.debug("Enqueued {}".format(zonefile_hash)) + log.debug("Stored {}".format(zonefile_hash)) saved.append(1) db.close() - log.debug("Saved %s zonefile(s)\n", sum(saved)) + log.debug("Saved {} zonefile(s)".format(sum(saved))) log.debug("Reply: {}".format({'saved': saved})) return self.success_response( {'saved': saved} ) - def get_name_rec(self, name): - """ - Get a name record, even if we're not an indexer node. - Return the name rec on success - Return {'error': ...} on failure - """ - name_rec = None - - if is_indexer(): - # fetch from db directly - db = get_db_state() - name_rec = db.get_name(name) - db.close() - - if name_rec is None: - return {'error': 'No such name'} - - else: - # fetch from upstream - name_rec = blockstack_client.proxy.get_name_blockchain_record(name) - if 'error' in name_rec: - return name_rec - - return name_rec - - - def rpc_get_profile(self, name, **con_info): - """ - Get a profile for a particular name - Return {'profile': profile text} on success - Return {'error': ...} on error - """ - conf = get_blockstack_opts() - if not conf['serve_profiles']: - return {'error': 'No data'} - - if not self.check_name(name): - return {'error': 'Invalid name'} - - zonefile_storage_drivers = conf['zonefile_storage_drivers'].split(",") - profile_storage_drivers = conf['profile_storage_drivers'].split(",") - - name_rec = self.get_name_rec(name) - if 'error' in name_rec: - return name_rec - - # find zonefile - zonefile_data = self.get_zonefile_data_by_name( conf, name, name_rec=name_rec ) - if zonefile_data is None: - return {'error': 'No zonefile'} - - # deserialize - try: - zonefile_dict = blockstack_zones.parse_zone_file( zonefile_data ) - except: - return {'error': 'Nonstandard zonefile'} - - # find the profile - try: - # NOTE: since we did not generate this zonefile (i.e. it's untrusted input, and we may be using different storage drivers), - # don't trust its URLs. Auto-generate them using our designated drivers instead. - # Also, do not attempt to decode the profile. The client will do this instead (avoid any decode-related attack vectors) - res = blockstack_client.get_profile(name, profile_storage_drivers=profile_storage_drivers, - zonefile_storage_drivers=zonefile_storage_drivers, - user_zonefile=zonefile_dict, name_record=name_rec, - use_zonefile_urls=False, decode_profile=False) - if 'error' in res: - log.error("Failed to load profile '{}'".format(name)) - return res - profile = res['profile'] - zonefile = res['zonefile'] - except Exception, e: - log.exception(e) - log.error("Failed to load profile for '{}'".fomrat(name)) - return {'error': 'Failed to load profile'} - - if 'error' in zonefile: - return zonefile - - else: - return self.success_response( {'profile': profile} ) - - - def verify_data_timestamp( self, datum ): - """ - Verify that the mutable timestamp is fresh, - and that the datum has a valid timestamp. - Return {'status': True} on success - Return {'error': ...} on error - """ - - # needs a timestamp - if 'timestamp' not in datum.keys(): - log.debug("Datum has no timestamp") - return {'error': 'Datum has no timestamp'} - - if type(datum['timestamp']) not in [int, long, float]: - log.debug("Datum has invalid timestamp type") - return {'error': 'Invalid timestamp type'} - - timestamp = datum['timestamp'] - - # timestamp needs to be fresh - now = time.time() - if abs(now - timestamp) > 30: - log.debug("Out-of-sync timestamp: |%s - %s| == %s" % (now, timestamp, abs(now, timestamp))) - return {'error': 'Invalid timestamp'} - - else: - log.debug("Client and server differ by %s seconds" % abs(now - timestamp)) - return {'status': True} - - - def load_mutable_data( self, name, data_txt, max_len=RPC_MAX_PROFILE_LEN, storage_drivers=None ): - """ - Parse and authenticate user-given data - Return {'status': True, 'data': data dict, 'data_pubkey': public key, 'owner': True|False} on success - Return {'error': ...} on failure - """ - - if type(name) not in [str, unicode]: - return {'error': 'Invalid name'} - - if not is_name_valid(name): - return {'error': 'Invalid name'} - - if type(data_txt) not in [str, unicode]: - return {'error': 'Data must be a serialized JWT'} - - if len(data_txt) > RPC_MAX_PROFILE_LEN: - return {'error': 'Serialized data is too big'} - - conf = get_blockstack_opts() - if conf['redirect_data']: - # redirect! - servers = filter(lambda x: len(x) > 0, conf['data_servers'].split(',')) - return {'error': 'redirect', 'servers': servers} - - zonefile_storage_drivers = conf['zonefile_storage_drivers'].split(",") - zonefile_dict = None - - # find name record - name_rec = self.get_name_rec(name) - if 'error' in name_rec: - return name_rec - - if name_rec is None: - log.debug("No name for '%s'" % name) - return {'error': 'No such name'} - - # find zonefile - zonefile_data = self.get_zonefile_data_by_name( conf, name, name_rec=name_rec ) - if zonefile_data is None: - log.debug("No zonefile for '%s'" % name) - return {'error': 'No zonefile'} - - # must be standard - try: - zonefile_dict = blockstack_zones.parse_zone_file( zonefile_data ) - except: - log.debug("Non-standard zonefile for %s" % name) - return {'error': 'Nonstandard zonefile'} - - # first, try to verify with zonefile public key (if one is given) - user_data_pubkey = blockstack_client.user_zonefile_data_pubkey( zonefile_dict ) - user_data = None - - if user_data_pubkey is not None: - try: - user_data = blockstack_client.parse_signed_data( data_txt, user_data_pubkey ) - if os.environ.get("BLOCKSTACK_TEST") == "1": - log.debug("Loaded {}".format(user_data)) - - except Exception, e: - log.exception(e) - log.debug("Failed to authenticate data") - return {'error': 'Failed to authenticate data'} - - else: - log.warn("Falling back to verifying with owner address") - owner_addr = name_rec.get('address', None) - if owner_addr is None: - log.debug("No owner address") - return {'error': 'No owner address'} - - try: - user_data = blockstack_client.parse_signed_data( data_txt, None, public_key_hash=owner_addr ) - except Exception, e: - log.exception(e) - log.debug("Failed to authenticate data") - return {'error': 'Failed to authenticate data'} - - # profiles and v1 data will be parsed out to a mutable data dict. - # v2 data will be parsed out to a serialized mutable data blob - if isinstance(user_data, (str, unicode)): - try: - user_data = json.loads(user_data) - except: - log.debug("Failed to parse mutable data blob") - return {'error': 'Failed to parse mutable data blob'} - - # must be a dict with a 'timestamp' field (either a profile or a mutable data blob) - user_data_schema = { - 'type': 'object', - 'properties': { - 'timestamp': { - 'type': 'number', - 'minimum': 0, - }, - }, - } - - try: - jsonschema.validate(user_data, user_data_schema) - except Exception as e: - log.debug("Failed to validate user data") - if os.environ.get("BLOCKSTACK_TEST") == "1": - log.exception(e) - - return {'error': 'Invalid mutable data blob'} - - # authentic! try to verify via timestamp - res = self.verify_data_timestamp( user_data ) - if 'error' in res: - log.debug("Failed to verify with timestamp.") - return {'error': 'Invalid timestamp', 'reason': 'timestamp', 'zonefile': zonefile_dict} - - return {'status': True, 'data': user_data} - - - def rpc_put_profile(self, name, profile_txt, prev_profile_hash_or_ignored, sigb64_or_ignored, **con_info ): - """ - Store a profile for a particular name - @profile_txt must be a serialized JWT signed by the key in the user's zonefile. - @prev_profile_hash_or_ignored, if given, must be the hex string representation of the hash of the previous profile - (this argument is obsolete in 0.14.1) - @sigb64_or_ignored, if given, must cover prev_profile_hash+profile_txt - (this argument is obsolete in 0.14.1) - """ - conf = get_blockstack_opts() - if not conf['serve_profiles']: - return {'error': 'No data'} - - if not self.check_name(name): - return {'error': 'Invalid name'} - - data_info = self.load_mutable_data(name, profile_txt, max_len=RPC_MAX_PROFILE_LEN) - if 'error' in data_info: - return data_info - - res = storage_enqueue_profile( name, str(profile_txt) ) - if not res: - log.error('Failed to queue {}-byte profile for {}'.format(len(profile_txt), name)) - return {'error': 'Failed to queue profile'} - - log.debug("Queued {}-byte profile for {}".format(len(profile_txt), name)) - return self.success_response( {'num_replicas': 1, 'num_failures': 0} ) - - - def rpc_put_mutable_data(self, blockchain_id, data_txt, **con_info ): - """ - Store mutable data - @data_txt is the data to store - - Only works if the mutable data payload has an associated blockchain ID that matches @blockchain_id - - This method does NOT need access to the database. - However, it only works if the caller has a registered name. - """ - - conf = get_blockstack_opts() - if not conf['serve_data']: - return {'error': 'No data'} - - if not self.check_name(blockchain_id): - return {'error': 'Invalid name'} - - if type(data_txt) not in [str, unicode]: - return {'error': 'Data must be a serialized JWT'} - - # must be v2 or later - if not data_txt.startswith('bsk2.'): - return {'error': 'Obsolete data format'} - - data_info = self.load_mutable_data(blockchain_id, data_txt, max_len=RPC_MAX_DATA_LEN) - if 'error' in data_info: - log.debug("Failed to parse mutable data: {}".format(data_info['error'])) - return data_info - - user_data = data_info['data'] - - # must be mutable data - try: - jsonschema.validate(user_data, blockstack_client.schemas.DATA_BLOB_SCHEMA) - except ValidationError as ve: - log.debug("User data is not a mutable data blob") - return {'error': 'Not a mutable data blob'} - - # must match name - if not user_data.has_key('blockchain_id') or blockchain_id != user_data['blockchain_id']: - log.debug("Data has no blockchain_id, or does not match {} (got {})".format(blockchain_id, user_data.get('blockchain_id', "None"))) - return {'error': 'Failed to validate data: invalid or missing blockchain ID'} - - fq_data_id = user_data['fq_data_id'] - - res = storage_enqueue_data( blockchain_id, fq_data_id, str(data_txt) ) - if not res: - log.error('Failed to queue {}-byte datum for {}'.format(len(data_txt), blockchain_id)) - return {'error': 'Failed to queue datum'} - - log.debug("Queued {}-byte datum from {}".format(len(data_txt), blockchain_id)) - return self.success_response( {'num_replicas': 1, 'num_failures': 0} ) - - - def rpc_get_data_servers( self, **con_info ): - """ - Get the list of data servers - Return {'status': True, 'servers': ...} on success - Return {'error': ...} on error - """ - conf = get_blockstack_opts() - if not conf.get('redirect_data', False): - return {'error': 'No data servers'} - - servers = filter(lambda x: len(x) > 0, conf['data_servers'].split(',')) - return {'status': True, 'servers': servers} - - def rpc_get_zonefiles_by_block( self, from_block, to_block, offset, count, **con_info ): """ Get information about zonefiles announced in blocks [@from_block, @to_block] @@ -1843,7 +1286,7 @@ class BlockstackdRPC( SimpleXMLRPCServer): zonefile_inv = atlas_get_zonefile_inventory( offset=offset, length=length ) - if os.environ.get("BLOCKSTACK_TEST", None) == "1": + if BLOCKSTACK_TEST: log.debug("Zonefile inventory is '%s'" % (atlas_inventory_to_string(zonefile_inv))) return self.success_response( {'inv': base64.b64encode(zonefile_inv) } ) @@ -1866,17 +1309,17 @@ class BlockstackdRPCServer( threading.Thread, object ): """ RPC server thread """ - def __init__(self, port ): + def __init__(self, working_dir, port ): super( BlockstackdRPCServer, self ).__init__() self.rpc_server = None self.port = port - + self.working_dir = working_dir def run(self): """ Serve until asked to stop """ - self.rpc_server = BlockstackdRPC( port=self.port ) + self.rpc_server = BlockstackdRPC( self.working_dir, port=self.port ) self.rpc_server.serve_forever() @@ -1921,281 +1364,7 @@ class GCThread( threading.Thread ): self.event_count += 1 -class BlockstackStoragePusher( threading.Thread ): - """ - worker thread to push data into storage providers, - so we don't block the RPC server. - """ - def __init__(self, conf, queue_path): - threading.Thread.__init__(self) - self.running = False - self.accepting = True - self.queue_path = queue_path - self.config = conf - - self.zonefile_dir = conf.get('zonefile_dir', None) - self.zonefile_storage_drivers = conf['zonefile_storage_drivers'].split(",") - self.zonefile_storage_drivers_write = conf['zonefile_storage_drivers_write'].split(",") - self.profile_storage_drivers = conf['profile_storage_drivers'].split(",") - self.profile_storage_drivers_write = conf['profile_storage_drivers_write'].split(",") - self.data_storage_drivers = conf['data_storage_drivers'].split(',') - self.data_storage_drivers_write = conf['data_storage_drivers_write'].split(',') - self.atlasdb_path = conf.get('atlasdb_path', None) - - self.zonefile_queue_id = "push-zonefile" - self.profile_queue_id = "push-profile" - self.data_queue_id = "push-data" - - # do not store data to ourselves - if 'blockstack_server' in self.zonefile_storage_drivers: - log.warn("Removing 'blockstack_server' from zone file storage drivers") - self.zonefile_storage_drivers.remove('blockstack_server') - - if 'blockstack_server' in self.zonefile_storage_drivers_write: - log.warn("Removing 'blockstack_server' from zone file storage write drivers") - self.zonefile_storage_drivers_write.remove('blockstack_server') - - if 'blockstack_server' in self.profile_storage_drivers: - log.warn("Removing 'blockstack_server' from profile storage drivers") - self.profile_storage_drivers.remove('blockstack_server') - - if 'blockstack_server' in self.profile_storage_drivers_write: - log.warn("Removing 'blockstack_server' from profile storage write drivers") - self.profile_storage_drivers_write.remove('blockstack_server') - - if 'blockstack_server' in self.data_storage_drivers: - log.warn("Removing 'blockstack_server' from data storage drivers") - self.data_storage_drivers.remove('blockstack_server') - - if 'blockstack_server' in self.data_storage_drivers_write: - log.warn("Removing 'blockstack_server' from data storage write drivers") - self.data_storage_drivers_write.remove('blockstack_server') - - - def enqueue_zonefile( self, txid, zonefile_hash, zonefile_data ): - """ - Enqueue a zonefile for replication - """ - if type(zonefile_data) not in [str, unicode]: - log.debug("Invalid zonefile data type") - return False - - if txid is not None and type(txid) not in [str, unicode]: - log.debug("Invalid txid type") - return False - - if type(zonefile_hash) not in [str, unicode]: - log.debug("Invalid zonefile hash type") - return False - - txid = str(txid) - zonefile_hash = str(zonefile_hash) - zonefile_data = str(zonefile_data) - - try: - # NOTE: we don't use or rely on the name here, but use the zonefile hash instead - existing = queue_findone( self.zonefile_queue_id, zonefile_hash, path=self.queue_path ) - if len(existing) > 0: - log.error("Already queued {}".format(zonefile_hash)) - return False - - log.debug("Queue {}-byte zonefile".format(len(zonefile_data))) - - # NOTE: we don't use or rely on the name here, but use the zonefile hash instead - res = queue_append( self.zonefile_queue_id, zonefile_hash, txid, block_height=0, zonefile_hash=zonefile_hash, zonefile_data=zonefile_data, path=self.queue_path ) - assert res - return True - except Exception as e: - log.exception(e) - return False - - - def enqueue_profile_or_data( self, blockchain_id, queue_id, data ): - """ - Enqueue a profile or mutable data for replication - """ - if type(blockchain_id) not in [str, unicode]: - log.debug("Invalid name type") - return False - - if type(data) not in [str, unicode]: - log.debug("Invalid profile or data type") - return False - - blockchain_id = str(blockchain_id) - data = str(data) - - try: - existing = queue_findone( queue_id, blockchain_id, path=self.queue_path ) - if len(existing) > 0: - log.error("Already queued something for {}".format(blockchain_id)) - return False - - log.debug("Queue {}-byte datum for {}".format(len(data), blockchain_id)) - res = queue_append( queue_id, blockchain_id, "00" * 32, block_height=0, profile=data, path=self.queue_path ) - assert res - return True - except Exception as e: - log.exception(e) - return False - - - def enqueue_profile( self, blockchain_id, profile_data ): - """ - Enqueue a profile for replication - """ - return self.enqueue_profile_or_data(blockchain_id, self.profile_queue_id, profile_data) - - - def enqueue_data( self, blockchain_id, fq_data_id, data_txt ): - """ - Enqueue a mutable datum for replication - """ - data_payload = { - 'data_txt': data_txt, - 'fq_data_id': fq_data_id - } - - return self.enqueue_profile_or_data(blockchain_id, self.data_queue_id, json.dumps(data_payload)) - - - def store_one_zonefile(self): - """ - Find and store one zonefile - """ - # find a zonefile - entries = queue_findall( self.zonefile_queue_id, limit=1, path=self.queue_path ) - if entries is None or len(entries) == 0: - # empty - return False - - entry = entries[0] - res = store_zonefile_data_to_storage( str(entry['zonefile']), entry['tx_hash'], required=self.zonefile_storage_drivers_write, - skip=['blockstack_server','blockstack_resolver','dht'], cache=False, zonefile_dir=self.zonefile_dir, tx_required=False ) - - if not res: - log.error("Failed to store zonefile {} ({} bytes)".format(entry['zonefile_hash'], len(entry['zonefile']))) - return False - - log.debug("Replicated zonefile {} ({} bytes)".format(entry['zonefile_hash'], len(entry['zonefile']))) - - if self.atlasdb_path is not None: - # mark present in the atlas subsystem - atlasdb_set_zonefile_present( str(entry['zonefile_hash']), True, path=self.atlasdb_path ) - - queue_removeall( entries, path=self.queue_path ) - return res - - - def store_one_profile_or_datum(self, queue_id, storage_drivers): - """ - Find and store one profile or datum - """ - entries = queue_findall( queue_id, limit=1, path=self.queue_path ) - if entries is None or len(entries) == 0: - # empty - return False - - entry = entries[0] - - blockchain_id = str(entry['fqu']) - fq_data_id = None - data_txt = None - profile = False - - try: - # mutable data? - payload = json.loads(entry['profile']) - - assert isinstance(payload, dict) - assert payload.has_key('fq_data_id') - assert payload.has_key('data_txt') - - fq_data_id = str(payload['fq_data_id']) - data_txt = str(payload['data_txt']) - - if os.environ.get("BLOCKSTACK_TEST") == "1": - log.debug("mutable datum: {}".format(entry['profile'])) - log.debug("mutable datum txt: {}".format(data_txt)) - - except AssertionError: - - # profile - fq_data_id = blockchain_id - data_txt = str(entry['profile']) - profile = True - - except Exception as e: - log.exception(e) - log.debug("entry = {}".format(entry)) - log.debug("Abandoning data from {}".format(blockchain_id)) - queue_removeall( entries, path=self.queue_path ) - return False - - success = store_mutable_data_to_storage( blockchain_id, fq_data_id, data_txt, profile=profile, required=storage_drivers, skip=['blockstack_server','blockstack_resolver']) - if not success: - log.error("Failed to store data for {} ({} bytes) (rc = {})".format(blockchain_id, len(data_txt), success)) - queue_removeall( entries, path=self.queue_path ) - return False - - log.debug("Replicated data for {} ({} bytes)".format(blockchain_id, len(data_txt))) - queue_removeall( entries, path=self.queue_path ) - return True - - - def store_one_profile(self): - """ - Find and store one profile - """ - return self.store_one_profile_or_datum(self.profile_queue_id, self.profile_storage_drivers) - - - def store_one_datum(self): - """ - Find and store one mutable datum - """ - return self.store_one_profile_or_datum(self.data_queue_id, self.data_storage_drivers) - - - def run(self): - """ - Push zonefiles and profiles - """ - global gc_thread - - self.running = True - while self.running: - - res_zonefile = self.store_one_zonefile() - res_profile = self.store_one_profile() - res_data = self.store_one_datum() - - if not res_zonefile and not res_profile and not res_data: - time.sleep(1.0) - gc_thread.gc_event() - continue - - else: - gc_thread.gc_event() - - log.debug("StoragePusher thread exit") - self.running = False - - - def signal_stop(self): - self.running = False - log.debug("StoragePusher signal stop") - - - def drain(self): - """ - Stop taking requests and wait for the queue to drain - """ - self.accepting = False - return - - -def rpc_start( port ): +def rpc_start( working_dir, port ): """ Start the global RPC server thread """ @@ -2204,7 +1373,7 @@ def rpc_start( port ): # let everyone in this thread know the PID os.environ["BLOCKSTACK_RPC_PID"] = str(os.getpid()) - rpc_server = BlockstackdRPCServer( port ) + rpc_server = BlockstackdRPCServer( working_dir, port ) log.debug("Starting RPC") rpc_server.start() @@ -2225,15 +1394,6 @@ def rpc_stop(): log.debug("RPC already joined") -def get_storage_queue_path(): - """ - Path to the on-disk storage queue - """ - working_dir = virtualchain.get_working_dir() - db_filename = blockstack_state_engine.get_virtual_chain_name() + ".queue" - return os.path.join( working_dir, db_filename ) - - def gc_start(): """ Start a thread to garbage-collect every 30 seconds. @@ -2257,59 +1417,6 @@ def gc_stop(): log.debug("GC thread joined") -def storage_start( blockstack_opts ): - """ - Start the global data-pusher thread - """ - global storage_pusher - - storage_queue = get_storage_queue_path() - storage_pusher = BlockstackStoragePusher( blockstack_opts, storage_queue ) - log.debug("Starting storage pusher") - storage_pusher.start() - - -def storage_stop(): - """ - Stop the global data-pusher thread - """ - global storage_pusher - - # if we're testing, then drain the storage queue completely - if os.environ.get("BLOCKSTACK_TEST") == "1": - log.debug("Draining storage pusher queue") - storage_pusher.drain() - - log.debug("Shutting down storage pusher") - storage_pusher.signal_stop() - storage_pusher.join() - log.debug("Storage pusher joined") - - -def storage_enqueue_zonefile( txid, zonefile_hash, zonefile_data ): - """ - Queue a zonefile for replication - """ - global storage_pusher - return storage_pusher.enqueue_zonefile( txid, zonefile_hash, zonefile_data ) - - -def storage_enqueue_profile( name, profile_data ): - """ - Queue a profile for replication - """ - global storage_pusher - return storage_pusher.enqueue_profile( name, profile_data ) - - -def storage_enqueue_data( blockchain_id, fq_data_id, datum ): - """ - Queue mutable data for replication - """ - global storage_pusher - return storage_pusher.enqueue_data( blockchain_id, fq_data_id, datum ) - - def atlas_start( blockstack_opts, db, port ): """ Start up atlas functionality @@ -2317,19 +1424,26 @@ def atlas_start( blockstack_opts, db, port ): # start atlas node atlas_state = None if blockstack_opts['atlas']: + if 'zonefiles' not in blockstack_opts: + log.error("No zonefiles directory set. Atlas will not initialize.") + return None + + if 'atlasdb_path' not in blockstack_opts: + log.error('No atlasdb path set. Atlas will not initialize.') + return None atlas_seed_peers = filter( lambda x: len(x) > 0, blockstack_opts['atlas_seeds'].split(",")) atlas_blacklist = filter( lambda x: len(x) > 0, blockstack_opts['atlas_blacklist'].split(",")) - zonefile_dir = blockstack_opts.get('zonefiles', None) + zonefile_dir = blockstack_opts['zonefiles'] zonefile_storage_drivers = filter( lambda x: len(x) > 0, blockstack_opts['zonefile_storage_drivers'].split(",")) zonefile_storage_drivers_write = filter( lambda x: len(x) > 0, blockstack_opts['zonefile_storage_drivers_write'].split(",")) my_hostname = blockstack_opts['atlas_hostname'] - initial_peer_table = atlasdb_init( blockstack_opts['atlasdb_path'], db, atlas_seed_peers, atlas_blacklist, validate=True, zonefile_dir=zonefile_dir ) + initial_peer_table = atlasdb_init( blockstack_opts['atlasdb_path'], zonefile_dir, db, atlas_seed_peers, atlas_blacklist, validate=True ) atlas_peer_table_init( initial_peer_table ) - atlas_state = atlas_node_start( my_hostname, port, atlasdb_path=blockstack_opts['atlasdb_path'], - zonefile_storage_drivers=zonefile_storage_drivers, zonefile_storage_drivers_write=zonefile_storage_drivers_write, zonefile_dir=zonefile_dir ) + atlas_state = atlas_node_start( my_hostname, port, blockstack_opts['atlasdb_path'], zonefile_dir, db.working_dir, + zonefile_storage_drivers=zonefile_storage_drivers, zonefile_storage_drivers_write=zonefile_storage_drivers_write) return atlas_state @@ -2378,7 +1492,7 @@ def check_server_running(pid): raise -def stop_server( clean=False, kill=False ): +def stop_server( working_dir, clean=False, kill=False ): """ Stop the blockstackd server. """ @@ -2388,7 +1502,7 @@ def stop_server( clean=False, kill=False ): for i in xrange(0, 5): # try to kill the main supervisor - pid_file = get_pidfile_path() + pid_file = get_pidfile_path(working_dir) if not os.path.exists(pid_file): dead = True break @@ -2418,9 +1532,9 @@ def stop_server( clean=False, kill=False ): # is it actually dead? blockstack_opts = get_blockstack_opts() - srv = blockstack_client.proxy.BlockstackRPCClient('localhost', blockstack_opts['rpc_port'], timeout=5, protocol = 'http') + srv = BlockstackRPCClient('localhost', blockstack_opts['rpc_port'], timeout=5, protocol = 'http') try: - res = blockstack_client.ping(proxy=srv) + res = blockstack_ping(proxy=srv) except socket.error as se: # dead? if se.errno == errno.ECONNREFUSED: @@ -2476,7 +1590,7 @@ def blockstack_tx_filter( tx ): return False -def index_blockchain( expected_snapshots=GENESIS_SNAPSHOT ): +def index_blockchain( working_dir, expected_snapshots=GENESIS_SNAPSHOT ): """ Index the blockchain: * find the range of blocks @@ -2486,15 +1600,10 @@ def index_blockchain( expected_snapshots=GENESIS_SNAPSHOT ): Return False if not Aborts on error """ - - if not is_indexer(): - # nothing to do - return True - bt_opts = get_bitcoin_opts() - start_block, current_block = get_index_range() + start_block, current_block = get_index_range(working_dir) - db = get_db_state() + db = get_db_state(working_dir) old_lastblock = db.lastblock if start_block is None and current_block is None: @@ -2505,7 +1614,7 @@ def index_blockchain( expected_snapshots=GENESIS_SNAPSHOT ): # bring the db up to the chain tip. log.debug("Begin indexing (up to %s)" % current_block) set_indexing( True ) - rc = virtualchain_hooks.sync_blockchain( bt_opts, current_block, expected_snapshots=expected_snapshots, tx_filter=blockstack_tx_filter ) + rc = virtualchain_hooks.sync_blockchain( working_dir, bt_opts, current_block, expected_snapshots=expected_snapshots, tx_filter=blockstack_tx_filter ) set_indexing( False ) db.close() @@ -2521,13 +1630,16 @@ def index_blockchain( expected_snapshots=GENESIS_SNAPSHOT ): # TODO: this is racy--we also do this in virtualchain-hooks blockstack_opts = get_blockstack_opts() if blockstack_opts.get('atlas', False): - db = get_db_state() - if old_lastblock < db.lastblock: - log.debug("Synchronize Atlas DB from %s to %s" % (old_lastblock+1, db.lastblock+1)) - zonefile_dir = blockstack_opts.get('zonefiles', get_zonefile_dir()) - atlasdb_sync_zonefiles( db, old_lastblock+1, zonefile_dir=zonefile_dir ) + if 'zonefiles' not in blockstack_opts: + log.error("No zonefiles directory set. Atlas node will not run.") + else: + db = get_db_state(working_dir) + if old_lastblock < db.lastblock: + log.debug("Synchronize Atlas DB from %s to %s" % (old_lastblock+1, db.lastblock+1)) + zonefile_dir = blockstack_opts['zonefiles'] + atlasdb_sync_zonefiles( db, old_lastblock+1, zonefile_dir ) - db.close() + db.close() log.debug("End indexing (up to %s)" % current_block) return rc @@ -2550,15 +1662,15 @@ def blockstack_signal_handler( sig, frame ): set_running(False) -def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None ): +def run_server( working_dir, foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None ): """ Run the blockstackd RPC server, optionally in the foreground. """ bt_opts = get_bitcoin_opts() blockstack_opts = get_blockstack_opts() - indexer_log_file = get_logfile_path() - pid_file = get_pidfile_path() - working_dir = virtualchain.get_working_dir() + indexer_log_file = get_logfile_path(working_dir) + pid_file = get_pidfile_path(working_dir) + db_path = virutalchain.get_db_path() if port is None: port = blockstack_opts['rpc_port'] @@ -2622,10 +1734,10 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None set_indexing( False ) # make sure client is initialized - get_blockstack_client_session() + # get_blockstack_client_session() # get db state - db = get_db_state() + db = get_db_state(working_dir) # start atlas node atlas_state = atlas_start( blockstack_opts, db, port ) @@ -2637,7 +1749,7 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None storage_start( blockstack_opts ) # start API server - rpc_start(port) + rpc_start(working_dir, port) set_running( True ) # clear any stale indexing state @@ -2648,7 +1760,7 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None while is_running(): try: - running = index_blockchain(expected_snapshots=expected_snapshots) + running = index_blockchain(working_dir, expected_snapshots=expected_snapshots) except Exception, e: log.exception(e) log.error("FATAL: caught exception while indexing") @@ -2699,7 +1811,7 @@ def run_server( foreground=False, expected_snapshots=GENESIS_SNAPSHOT, port=None return 0 -def setup( working_dir=None, return_parser=False ): +def setup( working_dir, return_parser=False ): """ Do one-time initialization. Call this to set up global state and set signal handlers. @@ -2712,30 +1824,26 @@ def setup( working_dir=None, return_parser=False ): """ # set up our implementation - virtualchain.setup_virtualchain( impl=blockstack_state_engine ) - working_dir = virtualchain.get_working_dir() - log.debug("Working dir: {}".format(working_dir)) - if not os.path.exists( working_dir ): os.makedirs( working_dir, 0700 ) # acquire configuration, and store it globally - opts = configure( interactive=True ) + opts = configure( working_dir, interactive=True ) blockstack_opts = opts['blockstack'] bitcoin_opts = opts['bitcoind'] # config file version check config_server_version = blockstack_opts.get('server_version', None) if (config_server_version is None or config.versions_need_upgrade(config_server_version, VERSION)): - print >> sys.stderr, "Obsolete or unrecognizable config file ({}): '{}' != '{}'".format(virtualchain.get_config_filename(), config_server_version, VERSION) + print >> sys.stderr, "Obsolete or unrecognizable config file ({}): '{}' != '{}'".format(virtualchain.get_config_filename(virtualchain_hooks, working_dir), config_server_version, VERSION) print >> sys.stderr, 'Please see the release notes for version {} for instructions to upgrade (in the release-notes/ folder).'.format(VERSION) return None log.debug("config:\n%s" % json.dumps(opts, sort_keys=True, indent=4)) # merge in command-line bitcoind options - config_file = virtualchain.get_config_filename() + config_file = virtualchain.get_config_filename(virtualchain_hooks, working_dir) arg_bitcoin_opts = None argparser = None @@ -2760,56 +1868,25 @@ def setup( working_dir=None, return_parser=False ): return None -def reconfigure(): +def reconfigure(working_dir): """ Reconfigure blockstackd. """ - configure( force=True ) + configure( working_dir, force=True ) print "Blockstack successfully reconfigured." sys.exit(0) -def clean( confirm=True ): +def verify_database(trusted_consensus_hash, consensus_block_height, untrusted_working_dir, trusted_working_dir, start_block=None, expected_snapshots={}): """ - Remove blockstack's db, lastblock, and snapshot files. - Prompt for confirmation + Verify that a database is consistent with a + known-good consensus hash. + Return True if valid. + Return False if not """ - - delete = False - exit_status = 0 - - if confirm: - warning = "WARNING: THIS WILL DELETE YOUR BLOCKSTACK DATABASE!\n" - warning+= "Database: '%s'\n" % blockstack_state_engine.working_dir - warning+= "Are you sure you want to proceed?\n" - warning+= "Type 'YES' if so: " - value = raw_input( warning ) - - if value != "YES": - sys.exit(exit_status) - - else: - delete = True - - else: - delete = True - - - if delete: - print "Deleting..." - - db_filename = virtualchain.get_db_filename() - lastblock_filename = virtualchain.get_lastblock_filename() - snapshots_filename = virtualchain.get_snapshots_filename() - - for path in [db_filename, lastblock_filename, snapshots_filename]: - try: - os.unlink( path ) - except: - log.warning("Unable to delete '%s'" % path) - exit_status = 1 - - sys.exit(exit_status) + db = BlockstackDB.get_readwrite_instance(trusted_working_dir) + consensus_impl = virtualchain_hooks + return virtualchain.state_engine_verify(trusted_consensus_hash, consensus_block_height, consensus_impl, untrusted_working_dir, db, start_block=start_block, expected_snapshots=expected_snapshots) def check_and_set_envars( argv ): @@ -2823,35 +1900,43 @@ def check_and_set_envars( argv ): argv should be like sys.argv: argv[0] is the binary Does not return on re-exec. - Return True if there was no need to re-exec + Returns {args} on success Returns False on error. """ special_flags = { - '--working-dir': { - 'arg': True, - 'envar': 'VIRTUALCHAIN_WORKING_DIR', - }, '--debug': { 'arg': False, 'envar': 'BLOCKSTACK_DEBUG', + 'exec': True, }, '--verbose': { 'arg': False, 'envar': 'BLOCKSTACK_DEBUG', + 'exec': True, }, '--testnet': { 'arg': False, - 'envar': 'BLOCKSTACK_TESTNET' + 'envar': 'BLOCKSTACK_TESTNET', + 'exec': True, }, '--testnet3': { 'arg': False, - 'envar': 'BLOCKSTACK_TESTNET3' + 'envar': 'BLOCKSTACK_TESTNET3', + 'exec': True, + }, + '--working_dir': { + 'arg': True, + 'argname': 'working_dir', + 'exec': False, }, } cli_envs = {} + cli_args = {} new_argv = [argv[0]] + do_exec = False + for i in xrange(1, len(argv)): arg = argv[i] @@ -2859,7 +1944,7 @@ def check_and_set_envars( argv ): for special_flag in special_flags.keys(): - if not arg.startswith( special_flag ): + if not arg.startswith(special_flag): continue if special_flags[special_flag]['arg']: @@ -2887,21 +1972,31 @@ def check_and_set_envars( argv ): break if value is not None: - # recognized - cli_envs[ special_flags[special_flag]['envar'] ] = value + if 'envar' in special_flags[special_flag]: + # recognized + cli_envs[ special_flags[special_flag]['envar'] ] = value + + if 'argname' in special_flags[special_flag]: + # recognized as special argument + cli_args[ special_flags[special_flag]['argname'] ] = value + new_argv.append(arg) + new_argv.append(value) + + if special_flags[special_flag]['exec']: + do_exec = True else: # not recognized new_argv.append(arg) - if len(cli_envs.keys()) > 0: + if do_exec: # re-exec for cli_env, cli_env_value in cli_envs.items(): os.environ[cli_env] = cli_env_value - os.execv( new_argv[0], new_argv ) + os.execv(new_argv[0], new_argv) - return True + return cli_args def load_expected_snapshots( snapshots_path ): @@ -2936,21 +2031,22 @@ def run_blockstackd(): """ run blockstackd """ - - check_and_set_envars( sys.argv ) - argparser = setup( return_parser=True ) + special_args = check_and_set_envars( sys.argv ) + working_dir = special_args.get('working_dir') + if working_dir is None: + working_dir = os.expanduser('~/{}'.format(virtualchain_hooks.get_virtual_chain_name())) + + argparser = setup( working_dir, return_parser=True ) if argparser is None: # fatal error os.abort() # need sqlite3 - sqlite3_tool = sqlite3_find_tool() + sqlite3_tool = virtualchain.sqlite3_find_tool() if sqlite3_tool is None: print 'Failed to find sqlite3 tool in your PATH. Cannot continue' sys.exit(1) - working_dir = virtualchain.get_working_dir() - # get RPC server options subparsers = argparser.add_subparsers( dest='action', help='the action to be taken') @@ -2967,9 +2063,6 @@ def run_blockstackd(): parser.add_argument( '--port', action='store', help='port to bind on') - parser.add_argument( - '--no-indexer', action='store_true', - help='do not index the blockchain') parser = subparsers.add_parser( 'stop', @@ -2993,45 +2086,22 @@ def run_blockstackd(): 'block_number', nargs='?', help="The block number to restore from (if not given, the last backup will be used)") - parser = subparsers.add_parser( - 'rebuilddb', - help='Reconstruct the current database from particular block number by replaying all prior name operations') - parser.add_argument( - 'db_path', - help='the path to the database') - parser.add_argument( - 'start_block_id', - help='the block ID from which to start rebuilding') - parser.add_argument( - 'end_block_id', - help='the block ID at which to stop rebuilding') - parser.add_argument( - '--resume-dir', nargs='?', - help='the temporary directory to store the database state as it is being rebuilt. Blockstackd will resume working from this directory if it is interrupted.') - parser = subparsers.add_parser( 'verifydb', help='verify an untrusted database against a known-good consensus hash') parser.add_argument( - 'block_id', - help='the block ID of the known-good consensus hash') + 'block_height', + help='the block height of the known-good consensus hash') parser.add_argument( 'consensus_hash', help='the known-good consensus hash') parser.add_argument( - 'db_path', - help='the path to the database') + 'chainstate_dir', + help='the path to the database directory to verify') parser.add_argument( '--expected-snapshots', action='store', help='path to a .snapshots file with the expected consensus hashes') - parser = subparsers.add_parser( - 'importdb', - help='import an existing trusted database') - parser.add_argument( - 'db_path', - help='the path to the database') - parser = subparsers.add_parser( 'version', help='Print version and exit') @@ -3059,7 +2129,7 @@ def run_blockstackd(): 'path', help='the path to the resulting snapshot') parser.add_argument( - 'block_id', nargs='?', + 'block_height', nargs='?', help='the block ID of the backup to use to make a fast-sync snapshot') parser = subparsers.add_parser( @@ -3079,11 +2149,9 @@ def run_blockstackd(): sys.exit(0) if args.action == 'start': - global has_indexer - has_indexer = (not args.no_indexer) expected_snapshots = {} - pid = read_pid_file(get_pidfile_path()) + pid = read_pid_file(get_pidfile_path(working_dir)) still_running = False if pid is not None: @@ -3097,7 +2165,7 @@ def run_blockstackd(): log.error("Blockstackd appears to be running already. If not, please run '{} stop'".format(sys.argv[0])) sys.exit(1) - if is_indexer() and pid is not None: + if pid is not None: # The server didn't shut down properly. # restore from back-up before running log.warning("Server did not shut down properly. Restoring state from last known-good backup.") @@ -3120,7 +2188,7 @@ def run_blockstackd(): blockstack_backup_restore( working_dir, None ) # make sure we "stop" - config.set_indexing(False) + set_indexing(False) # use snapshots? if args.expected_snapshots is not None: @@ -3132,7 +2200,7 @@ def run_blockstackd(): # we're definitely not running, so make sure this path is clear try: - os.unlink(get_pidfile_path()) + os.unlink(get_pidfile_path(working_dir)) except: pass @@ -3147,47 +2215,32 @@ def run_blockstackd(): else: args.port = None - exit_status = run_server( foreground=args.foreground, expected_snapshots=expected_snapshots, port=args.port ) + exit_status = run_server( working_dir, foreground=args.foreground, expected_snapshots=expected_snapshots, port=args.port ) if args.foreground: log.info("Service endpoint exited with status code %s" % exit_status ) elif args.action == 'stop': - stop_server(kill=True) + stop_server(working_dir, kill=True) elif args.action == 'configure': - reconfigure() + reconfigure(working_dir) elif args.action == 'restore': block_number = args.block_number if block_number is not None: block_number = int(block_number) - blockstack_backup_restore( working_dir, args.block_number ) - - elif args.action == 'clean': - clean( confirm=(not args.force) ) - - elif args.action == 'rebuilddb': - - resume_dir = None - if hasattr(args, 'resume_dir') and args.resume_dir is not None: - resume_dir = args.resume_dir - - final_consensus_hash = rebuild_database( int(args.end_block_id), args.db_path, start_block=int(args.start_block_id), resume_dir=resume_dir ) - print "Rebuilt database in '%s'" % working_dir - print "The final consensus hash is '%s'" % final_consensus_hash + blockstack_backup_restore(working_dir, args.block_number) elif args.action == 'verifydb': - db_path = virtualchain.get_db_filename() - working_db_path = os.path.join( working_dir, os.path.basename( db_path ) ) expected_snapshots = None - if args.expected_snapshots is not None: expected_snapshots = load_expected_snapshots( args.expected_snapshots ) if expected_snapshots is None: sys.exit(1) - - rc = verify_database( args.consensus_hash, int(args.block_id), args.db_path, working_db_path=working_db_path, expected_snapshots=expected_snapshots ) + + tmpdir = tempfile.mkdtemp('blockstack-verify-chainstate-XXXXXX') + rc = verify_database(args.consensus_hash, int(args.block_height), args.chainstate_dir, tmpdir, expected_snapshots=expected_snapshots) if rc: # success! print "Database is consistent with %s" % args.consensus_hash @@ -3197,28 +2250,6 @@ def run_blockstackd(): # failure! print "Database is NOT CONSISTENT" - elif args.action == 'importdb': - # re-target working dir so we move the database state to the correct location - old_working_dir = virtualchain.get_working_dir() - virtualchain.setup_virtualchain( blockstack_state_engine ) - - db_path = virtualchain.get_db_filename() - old_snapshots_path = os.path.join( old_working_dir, os.path.basename( virtualchain.get_snapshots_filename() ) ) - old_lastblock_path = os.path.join( old_working_dir, os.path.basename( virtualchain.get_lastblock_filename() ) ) - - if os.path.exists( db_path ): - print "Backing up existing database to %s.bak" % db_path - shutil.move( db_path, db_path + ".bak" ) - - print "Importing database from %s to %s" % (args.db_path, db_path) - shutil.copy( args.db_path, db_path ) - - print "Importing snapshots from %s to %s" % (old_snapshots_path, virtualchain.get_snapshots_filename() ) - shutil.copy( old_snapshots_path, virtualchain.get_snapshots_filename() ) - - print "Importing lastblock from %s to %s" % (old_lastblock_path, virtualchain.get_lastblock_filename() ) - shutil.copy( old_lastblock_path, virtualchain.get_lastblock_filename() ) - elif args.action == 'fast_sync_snapshot': # create a fast-sync snapshot from the last backup dest_path = str(args.path) @@ -3229,11 +2260,11 @@ def run_blockstackd(): print "Invalid private key" sys.exit(1) - block_id = None - if args.block_id is not None: - block_id = int(args.block_id) + block_height = None + if args.block_height is not None: + block_height = int(args.block_height) - rc = fast_sync_snapshot( dest_path, private_key, block_id ) + rc = fast_sync_snapshot(working_dir, dest_path, private_key, block_height) if not rc: print "Failed to create snapshot" sys.exit(1) @@ -3286,6 +2317,3 @@ def run_blockstackd(): print "Start your node with `blockstack-core start`" print "Pass `--debug` for extra output." -if __name__ == '__main__': - - run_blockstackd()