From ed54c905de52500b39732fc2dbf1384d4d68ecbd Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Fri, 12 Jan 2018 18:28:06 -0500 Subject: [PATCH] update virtualchain_hooks (the virtualchain implementation) to adhere to the new StateEngine standards in virtualchain 0.18. In particular: * add method for which blockchain we're on * add method for determining how long a consensus hash is valid * add method for getting all opcodes * add method for getting all operations' serialization fields * accept transactions in the virtualchain-standardized format, not the bitcoind RPC format * stronger and more explicit adherence to acquiring read/only versus read/write BlockstackDB handles --- blockstack/lib/nameset/virtualchain_hooks.py | 435 ++++++++++--------- 1 file changed, 219 insertions(+), 216 deletions(-) diff --git a/blockstack/lib/nameset/virtualchain_hooks.py b/blockstack/lib/nameset/virtualchain_hooks.py index 6265170b3..c6bae3c45 100644 --- a/blockstack/lib/nameset/virtualchain_hooks.py +++ b/blockstack/lib/nameset/virtualchain_hooks.py @@ -25,6 +25,7 @@ import os import gc +import copy from .namedb import * @@ -35,184 +36,167 @@ import virtualchain log = virtualchain.get_logger("blockstack-log") def get_virtual_chain_name(): - """ - (required by virtualchain state engine) + """ + (required by virtualchain state engine) - Get the name of the virtual chain we're building. - """ - return "blockstack-server" + Get the name of the virtual chain we're building. + """ + return "blockstack-server" def get_virtual_chain_version(): - """ - (required by virtualchain state engine) + """ + (required by virtualchain state engine) - Get the version string for this virtual chain. - """ - return VERSION + Get the version string for this virtual chain. + """ + return VERSION def get_opcodes(): - """ - (required by virtualchain state engine) + """ + (required by virtualchain state engine) - Get the list of opcodes we're looking for. - """ - return OPCODES + Get the list of opcodes we're looking for. + """ + return OPCODES -def get_op_processing_order(): - """ - (required by virtualchain state engine) - - Give a hint as to the order in which we process operations - """ - return OPCODES +def get_opfields(): + """ + (required by virtaulchain state engine) + Get a dict that maps each opcode to the list of transaction fields to serialize + """ + return BlockstackDB.make_opfields() def get_magic_bytes(): - """ - (required by virtualchain state engine) + """ + (required by virtualchain state engine) - Get the magic byte sequence for our OP_RETURNs - """ - return MAGIC_BYTES + Get the magic byte sequence for our OP_RETURNs + """ + return MAGIC_BYTES def get_first_block_id(): - """ - (required by virtualchain state engine) + """ + (required by virtualchain state engine) - Get the id of the first block to start indexing. - """ - start_block = FIRST_BLOCK_MAINNET - return start_block + Get the id of the first block to start indexing. + """ + start_block = FIRST_BLOCK_MAINNET + return start_block -def get_last_block(): +def get_blockchain(): + """ + (required by virtualchain state engine) + + Which blockchain do we index? + """ + return "bitcoin" + + +def get_valid_transaction_window(): + """ + (required by virtualchain state engine) + + How many blocks is a transaction good for? + """ + return 24 + + +def get_initial_snapshots(): + """ + (required by virtualchain state engine) + + What are the initial consensus hashes? + """ + return GENESIS_SNAPSHOT + + +def get_last_block(working_dir): """ Get the last block processed Return the integer on success Return None on error """ - + # make this usable even if we haven't explicitly configured virtualchain - impl = virtualchain.get_implementation() - if impl is None: - impl = sys.modules[__name__] - - lastblock_filename = virtualchain.get_lastblock_filename(impl=impl) - if os.path.exists( lastblock_filename ): - try: - with open(lastblock_filename, "r") as f: - lastblock = int( f.read().strip() ) - return lastblock - - except Exception, e: - # this can't ever happen - log.exception(e) - return None - - return None + impl = sys.modules[__name__] + return BlockstackDB.get_lastblock(impl, working_dir) -def get_snapshots(): +def get_snapshots(working_dir, start_block=None, end_block=None): """ Read the virtualchain snapshots Returns the dict of {snapshots: {$block_height: $consensus_hash}} on success Returns None on error """ + # make this usable even if we haven't explicitly configured virtualchain - impl = virtualchain.get_implementation() - if impl is None: - impl = sys.modules[__name__] - - snapshots_filename = virtualchain.get_snapshots_filename(impl=impl) - if os.path.exists(snapshots_filename): - try: - with open(snapshots_filename, 'r') as f: - snapshots_bin = f.read() - snapshots = json.loads(snapshots_bin) - return snapshots - - except Exception as e: - log.exception(e) - return None - - return None + impl = sys.modules[__name__] + return BlockstackDB.get_consensus_hashes(impl, working_dir, start_block_height=start_block, end_block_height=end_block) -def get_db_state( disposition=DISPOSITION_RO ): +def get_db_state(working_dir): + """ + Callback to the virtual chain state engine. + Get a *read-only* handle to our state engine implementation + (i.e. our name database). + + Note that in this implementation, the database + handle returned will only support read-only operations by default. + Attempts to save state with the handle will lead to program abort. + + Returns the handle on success + Raises on error + """ + + # make this usable even if we haven't explicitly configured virtualchain + impl = sys.modules[__name__] + db_inst = BlockstackDB.get_readonly_instance(working_dir) + assert db_inst, 'Failed to instantiate database handle' + return db_inst + + +def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee, db_state=None, **virtualchain_hints ): """ (required by virtualchain state engine) - Callback to the virtual chain state engine. - Get a handle to our state engine implementation - (i.e. our name database). + Parse a blockstack operation from a transaction. The transaction fields are as follows: + * `block_id` is the blockchain height at which this transaction occurs + * `txid` is the transaction ID + * `data` is the scratch area of the transaction that contains the actual virtualchain operation (e.g. "id[opcode][payload]") + * `senders` is a list in 1-to-1 correspondence with `inputs` that contains information about what funded the inputs + * `inputs` are the list of inputs to the transaction. Some blockchains (like Bitcoin) support multiple inputs, whereas others (like Ethereum) support only 1. + * `outputs` are the list of outputs of the transaction. Some blockchains (like Bitcoin) support multiple outputs, whereas others (like Ethereum) support only 1. + * `fee` is the transaction fee. - Note that in this implementation, the database - handle returned will only support read-only operations by default. - NO COMMITS WILL BE ALLOWED. - """ - - # make this usable even if we haven't explicitly configured virtualchain - impl = virtualchain.get_implementation() - if impl is None: - impl = sys.modules[__name__] - - db_filename = virtualchain.get_db_filename(impl=impl) - lastblock_filename = virtualchain.get_lastblock_filename(impl=impl) - lastblock = None - firstcheck = True - - for path in [db_filename, lastblock_filename]: - if os.path.exists( path ): - # have already created the db - firstcheck = False - - if not firstcheck and not os.path.exists( lastblock_filename ): - # this can't ever happen - log.error("FATAL: no such file or directory: %s" % lastblock_filename ) - os.abort() - - # verify that it is well-formed, if it exists - elif os.path.exists( lastblock_filename ): - try: - with open(lastblock_filename, "r") as f: - lastblock = int( f.read().strip() ) - - except Exception, e: - # this can't ever happen - log.error("FATAL: failed to parse: %s" % lastblock_filename) - log.exception(e) - os.abort() - - db_inst = BlockstackDB( db_filename, disposition ) - - return db_inst - - -def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee, db_state=None ): - """ - (required by virtualchain state engine) - - Parse a blockstack operation from a transaction's nulldata (data) and a list of outputs, as well as - optionally the list of transaction's senders and the total fee paid. Use the operation-specific - extract_${OPCODE}() method to get the data, and make sure the operation-defined fields are all set. + `db_state` is the StateEngine-derived class. This is a BlockstackDB instance. + `**virtualchain_hints` is a dict with extra virtualchain hints that may be relevant. We require: + * `raw_tx`: the hex-encoded string containing the raw transaction. + Returns a dict with the parsed operation on success. Return None on error - - NOTE: the transactions that our tools put have a single sender, and a single output address. - This is assumed by this code. """ - # basic sanity checks if len(senders) == 0: raise Exception("No senders given") - + + # this virtualchain instance must give the 'raw_tx' hint + assert 'raw_tx' in virtualchain_hints, 'BUG: incompatible virtualchain: requires raw_tx support' + + # internal sanity check + raw_tx = virtualchain_hints['raw_tx'] + btc_tx_data = virtualchain.btc_tx_deserialize(raw_tx) + test_btc_tx = virtualchain.btc_tx_serialize({'ins': inputs, 'outs': outputs, 'locktime': btc_tx_data['locktime'], 'version': btc_tx_data['version']}) + assert raw_tx == test_btc_tx, 'TX mismatch: {} != {}'.format(raw_tx, test_btc_tx) + # make sure each op has all the right fields defined try: - opcode = op_get_opcode_name( op ) + opcode = op_get_opcode_name(op) assert opcode is not None, "Unrecognized opcode '%s'" % op except Exception, e: log.exception(e) @@ -222,28 +206,39 @@ def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee, log.debug("PARSE %s at (%s, %s): %s" % (opcode, block_id, vtxindex, data.encode('hex'))) # get the data - op = None + op_data = None try: - op = op_extract( opcode, data, senders, inputs, outputs, block_id, vtxindex, txid ) + op_data = op_extract( opcode, data, senders, inputs, outputs, block_id, vtxindex, txid ) except Exception, e: log.exception(e) - op = None + op_data = None - if op is not None: + if op_data is not None: + try: + assert 'op' in op_data, 'BUG: missing op' + except Exception as e: + log.exception(e) + log.error("BUG: missing op") + os.abort() + + original_op_data = copy.deepcopy(op_data) # propagate tx data - op['vtxindex'] = int(vtxindex) - op['txid'] = str(txid) + op_data['vtxindex'] = int(vtxindex) + op_data['txid'] = str(txid) + op_data['__original_op_data__'] = original_op_data else: log.error("Unparseable op '%s'" % opcode) - return op + return op_data def check_mutate_fields( op, op_data ): """ Verify that all mutate fields are present. + Return True if so. + Raise an exception (AssertionError) if not. """ mutate_fields = op_get_mutate_fields( op ) @@ -266,6 +261,9 @@ def db_scan_block( block_id, op_list, db_state=None ): do block-level preprocessing: * find the state-creation operations we will accept * make sure there are no collisions. + + This modifies op_list, but returns nothing. + This aborts on runtime error. """ try: @@ -275,6 +273,7 @@ def db_scan_block( block_id, op_list, db_state=None ): log.error("FATAL: no state given") os.abort() + log.debug("SCAN BEGIN: {} ops at block {}".format(len(op_list), block_id)) checked_ops = [] for op_data in op_list: @@ -300,7 +299,7 @@ def db_scan_block( block_id, op_list, db_state=None ): # reject all operations that will collide db_state.put_collisions( block_id, collisions ) - + log.debug("SCAN END: {} ops at block {} ({} collisions)".format(len(op_list), block_id, len(collisions))) def db_check( block_id, new_ops, op, op_data, txid, vtxindex, checked_ops, db_state=None ): @@ -317,7 +316,8 @@ def db_check( block_id, new_ops, op, op_data, txid, vtxindex, checked_ops, db_st affected more than once, then the opcode priority rules take effect, and the lower priority opcodes are rejected. - Return True if it's valid; False if not. + Return True if it's valid + Return False if not. """ accept = True @@ -374,96 +374,104 @@ def db_commit( block_id, op, op_data, txid, vtxindex, db_state=None ): be fed into virtualchain to translate into a string to be used to generate this block's consensus hash. """ - - if db_state is not None: - if op_data is not None: - - try: - assert 'txid' in op_data, "BUG: No txid given" - assert 'vtxindex' in op_data, "BUG: No vtxindex given" - assert op_data['txid'] == txid, "BUG: txid mismatch" - assert op_data['vtxindex'] == vtxindex, "BUG: vtxindex mismatch" - # opcode = op_get_opcode_name( op_data['op'] ) - opcode = op_data.get('opcode', None) - assert opcode in OPCODE_PREORDER_OPS + OPCODE_CREATION_OPS + OPCODE_TRANSITION_OPS + OPCODE_STATELESS_OPS, \ - "BUG: uncategorized opcode '%s'" % opcode - - except Exception, e: - log.exception(e) - log.error("FATAL: failed to commit operation") - os.abort() - - if opcode in OPCODE_STATELESS_OPS: - # state-less operation - return [] - - else: - op_seq = db_state.commit_operation( op_data, block_id ) - return op_seq - - else: - # final commit for this block - try: - db_state.commit_finished( block_id ) - except Exception, e: - log.exception(e) - log.error("FATAL: failed to commit at block %s" % block_id ) - os.abort() - - return None - - else: - log.error("FATAL: no state engine given") + try: + assert db_state is not None + except: + log.error("FATAL: no state given") os.abort() + if op_data is not None: - -def db_save( block_id, consensus_hash, pending_ops, filename, db_state=None ): - """ - (required by virtualchain state engine) - - Save all persistent state to stable storage. - Called once per block. - - Return True on success - Return False on failure. - """ - - from ..atlas import atlasdb_sync_zonefiles - - if db_state is not None: - + # sanity checks try: - # pre-calculate the ops hash for SNV - ops_hash = BlockstackDB.calculate_block_ops_hash( db_state, block_id ) - db_state.store_block_ops_hash( block_id, ops_hash ) + assert '__original_op_data__' in op_data, 'BUG: no __original_op_data__' + assert 'txid' in op_data, "BUG: No txid given" + assert 'vtxindex' in op_data, "BUG: No vtxindex given" + assert op_data['txid'] == txid, "BUG: txid mismatch" + assert op_data['vtxindex'] == vtxindex, "BUG: vtxindex mismatch" + + opcode = op_data.get('opcode', None) + assert opcode in OPCODE_PREORDER_OPS + OPCODE_CREATION_OPS + OPCODE_TRANSITION_OPS + OPCODE_STATELESS_OPS, \ + "BUG: uncategorized opcode '%s'" % opcode + except Exception, e: log.exception(e) - log.error("FATAL: failed to calculate ops hash at block %s" % block_id ) + log.error("FATAL: failed to commit operation") os.abort() + # from db_parse + original_op_data = op_data['__original_op_data__'] + del op_data['__original_op_data__'] + + # save, and get the sequence of committed operations + op_seq = None + if opcode in OPCODE_STATELESS_OPS: + # state-less operation + op_seq = [] + + else: + op_seq = db_state.commit_operation(original_op_data, op_data, block_id) + + return op_seq + + else: + # final commit for this block try: - # flush the database db_state.commit_finished( block_id ) except Exception, e: log.exception(e) log.error("FATAL: failed to commit at block %s" % block_id ) os.abort() + return None + + +def db_save( block_height, consensus_hash, ops_hash, accepted_ops, virtualchain_ops_hints, db_state=None ): + """ + (required by virtualchain state engine) + + Save all persistent state to stable storage. + Called once per block. + + In Blockstack's case, we save transactions as we process them. + The only thing to do here is to synchronize the Atlas DB and clean up the + BlockstackDB instance in preparation for receiving the next blocks' transactions. + + Return True on success + Return False on failure. + """ + from ..atlas import atlasdb_sync_zonefiles + + if db_state is not None: + + try: + # flush the database + db_state.commit_finished( block_height ) + except Exception, e: + log.exception(e) + log.error("FATAL: failed to commit at block %s" % block_height ) + os.abort() + try: # sync block data to atlas, if enabled blockstack_opts = get_blockstack_opts() if blockstack_opts.get('atlas', False): - log.debug("Synchronize Atlas DB for %s" % (block_id-1)) - zonefile_dir = blockstack_opts.get('zonefiles', get_zonefile_dir()) + if 'zonefiles' not in blockstack_opts: + log.error("No zonefiles directory set in config file. Atlas will be disabled") + elif 'atlasdb_path' not in blockstack_opts: + log.error('No atlasdb_path set in config file. Atlas will be disabled') + else: + log.debug("Synchronize Atlas DB for {}".format(block_height)) + zonefile_dir = blockstack_opts['zonefiles'] + atlasdb_path = blockstack_opts['atlasdb_path'] - gc.collect() - atlasdb_sync_zonefiles( db_state, block_id-1, zonefile_dir=zonefile_dir ) - gc.collect() + gc.collect() + atlasdb_sync_zonefiles(db_state, block_height, zonefile_dir, path=atlasdb_path) + gc.collect() except Exception, e: log.exception(e) - log.error("FATAL: failed to update Atlas db at %s" % block_id ) + log.error("FATAL: failed to update Atlas db at %s" % block_height ) os.abort() return True @@ -490,27 +498,22 @@ def db_continue( block_id, consensus_hash ): return is_running() or os.environ.get("BLOCKSTACK_TEST") == "1" -def sync_blockchain( bt_opts, last_block, expected_snapshots={}, **virtualchain_args ): +def sync_blockchain( working_dir, bt_opts, last_block, expected_snapshots={}, **virtualchain_args ): """ synchronize state with the blockchain. Return True on success Return False if we're supposed to stop indexing Abort on error """ - + # make this usable even if we haven't explicitly configured virtualchain impl = sys.modules[__name__] - if virtualchain.get_implementation() is not None: - impl = None - - log.info("Synchronizing database up to block %s" % last_block) - - db_filename = virtualchain.get_db_filename(impl=impl) + log.info("Synchronizing database {} up to block {}".format(working_dir, last_block)) # NOTE: this is the only place where a read-write handle should be created, # since this is the only place where the db should be modified. - new_db = BlockstackDB.borrow_readwrite_instance( db_filename, last_block, expected_snapshots=expected_snapshots ) - rc = virtualchain.sync_virtualchain( bt_opts, last_block, new_db, expected_snapshots=expected_snapshots, **virtualchain_args ) - BlockstackDB.release_readwrite_instance( new_db, last_block ) + new_db = BlockstackDB.borrow_readwrite_instance(working_dir, last_block, expected_snapshots=expected_snapshots) + rc = virtualchain.sync_virtualchain(bt_opts, last_block, new_db, expected_snapshots=expected_snapshots, **virtualchain_args) + BlockstackDB.release_readwrite_instance(new_db, last_block) return rc