update virtualchain_hooks (the virtualchain implementation) to adhere to the new StateEngine standards in virtualchain 0.18. In particular:

* add method for which blockchain we're on
* add method for determining how long a consensus hash is valid
* add method for getting all opcodes
* add method for getting all operations' serialization fields
* accept transactions in the virtualchain-standardized format, not the bitcoind RPC format
* stronger and more explicit adherence to acquiring read/only versus read/write BlockstackDB handles
This commit is contained in:
Jude Nelson
2018-01-12 18:28:06 -05:00
parent 68c0828a6d
commit ed54c905de

View File

@@ -25,6 +25,7 @@
import os
import gc
import copy
from .namedb import *
@@ -35,184 +36,167 @@ import virtualchain
log = virtualchain.get_logger("blockstack-log")
def get_virtual_chain_name():
"""
(required by virtualchain state engine)
"""
(required by virtualchain state engine)
Get the name of the virtual chain we're building.
"""
return "blockstack-server"
Get the name of the virtual chain we're building.
"""
return "blockstack-server"
def get_virtual_chain_version():
"""
(required by virtualchain state engine)
"""
(required by virtualchain state engine)
Get the version string for this virtual chain.
"""
return VERSION
Get the version string for this virtual chain.
"""
return VERSION
def get_opcodes():
"""
(required by virtualchain state engine)
"""
(required by virtualchain state engine)
Get the list of opcodes we're looking for.
"""
return OPCODES
Get the list of opcodes we're looking for.
"""
return OPCODES
def get_op_processing_order():
"""
(required by virtualchain state engine)
Give a hint as to the order in which we process operations
"""
return OPCODES
def get_opfields():
"""
(required by virtaulchain state engine)
Get a dict that maps each opcode to the list of transaction fields to serialize
"""
return BlockstackDB.make_opfields()
def get_magic_bytes():
"""
(required by virtualchain state engine)
"""
(required by virtualchain state engine)
Get the magic byte sequence for our OP_RETURNs
"""
return MAGIC_BYTES
Get the magic byte sequence for our OP_RETURNs
"""
return MAGIC_BYTES
def get_first_block_id():
"""
(required by virtualchain state engine)
"""
(required by virtualchain state engine)
Get the id of the first block to start indexing.
"""
start_block = FIRST_BLOCK_MAINNET
return start_block
Get the id of the first block to start indexing.
"""
start_block = FIRST_BLOCK_MAINNET
return start_block
def get_last_block():
def get_blockchain():
"""
(required by virtualchain state engine)
Which blockchain do we index?
"""
return "bitcoin"
def get_valid_transaction_window():
"""
(required by virtualchain state engine)
How many blocks is a transaction good for?
"""
return 24
def get_initial_snapshots():
"""
(required by virtualchain state engine)
What are the initial consensus hashes?
"""
return GENESIS_SNAPSHOT
def get_last_block(working_dir):
"""
Get the last block processed
Return the integer on success
Return None on error
"""
# make this usable even if we haven't explicitly configured virtualchain
impl = virtualchain.get_implementation()
if impl is None:
impl = sys.modules[__name__]
lastblock_filename = virtualchain.get_lastblock_filename(impl=impl)
if os.path.exists( lastblock_filename ):
try:
with open(lastblock_filename, "r") as f:
lastblock = int( f.read().strip() )
return lastblock
except Exception, e:
# this can't ever happen
log.exception(e)
return None
return None
impl = sys.modules[__name__]
return BlockstackDB.get_lastblock(impl, working_dir)
def get_snapshots():
def get_snapshots(working_dir, start_block=None, end_block=None):
"""
Read the virtualchain snapshots
Returns the dict of {snapshots: {$block_height: $consensus_hash}} on success
Returns None on error
"""
# make this usable even if we haven't explicitly configured virtualchain
impl = virtualchain.get_implementation()
if impl is None:
impl = sys.modules[__name__]
snapshots_filename = virtualchain.get_snapshots_filename(impl=impl)
if os.path.exists(snapshots_filename):
try:
with open(snapshots_filename, 'r') as f:
snapshots_bin = f.read()
snapshots = json.loads(snapshots_bin)
return snapshots
except Exception as e:
log.exception(e)
return None
return None
impl = sys.modules[__name__]
return BlockstackDB.get_consensus_hashes(impl, working_dir, start_block_height=start_block, end_block_height=end_block)
def get_db_state( disposition=DISPOSITION_RO ):
def get_db_state(working_dir):
"""
Callback to the virtual chain state engine.
Get a *read-only* handle to our state engine implementation
(i.e. our name database).
Note that in this implementation, the database
handle returned will only support read-only operations by default.
Attempts to save state with the handle will lead to program abort.
Returns the handle on success
Raises on error
"""
# make this usable even if we haven't explicitly configured virtualchain
impl = sys.modules[__name__]
db_inst = BlockstackDB.get_readonly_instance(working_dir)
assert db_inst, 'Failed to instantiate database handle'
return db_inst
def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee, db_state=None, **virtualchain_hints ):
"""
(required by virtualchain state engine)
Callback to the virtual chain state engine.
Get a handle to our state engine implementation
(i.e. our name database).
Parse a blockstack operation from a transaction. The transaction fields are as follows:
* `block_id` is the blockchain height at which this transaction occurs
* `txid` is the transaction ID
* `data` is the scratch area of the transaction that contains the actual virtualchain operation (e.g. "id[opcode][payload]")
* `senders` is a list in 1-to-1 correspondence with `inputs` that contains information about what funded the inputs
* `inputs` are the list of inputs to the transaction. Some blockchains (like Bitcoin) support multiple inputs, whereas others (like Ethereum) support only 1.
* `outputs` are the list of outputs of the transaction. Some blockchains (like Bitcoin) support multiple outputs, whereas others (like Ethereum) support only 1.
* `fee` is the transaction fee.
Note that in this implementation, the database
handle returned will only support read-only operations by default.
NO COMMITS WILL BE ALLOWED.
"""
# make this usable even if we haven't explicitly configured virtualchain
impl = virtualchain.get_implementation()
if impl is None:
impl = sys.modules[__name__]
db_filename = virtualchain.get_db_filename(impl=impl)
lastblock_filename = virtualchain.get_lastblock_filename(impl=impl)
lastblock = None
firstcheck = True
for path in [db_filename, lastblock_filename]:
if os.path.exists( path ):
# have already created the db
firstcheck = False
if not firstcheck and not os.path.exists( lastblock_filename ):
# this can't ever happen
log.error("FATAL: no such file or directory: %s" % lastblock_filename )
os.abort()
# verify that it is well-formed, if it exists
elif os.path.exists( lastblock_filename ):
try:
with open(lastblock_filename, "r") as f:
lastblock = int( f.read().strip() )
except Exception, e:
# this can't ever happen
log.error("FATAL: failed to parse: %s" % lastblock_filename)
log.exception(e)
os.abort()
db_inst = BlockstackDB( db_filename, disposition )
return db_inst
def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee, db_state=None ):
"""
(required by virtualchain state engine)
Parse a blockstack operation from a transaction's nulldata (data) and a list of outputs, as well as
optionally the list of transaction's senders and the total fee paid. Use the operation-specific
extract_${OPCODE}() method to get the data, and make sure the operation-defined fields are all set.
`db_state` is the StateEngine-derived class. This is a BlockstackDB instance.
`**virtualchain_hints` is a dict with extra virtualchain hints that may be relevant. We require:
* `raw_tx`: the hex-encoded string containing the raw transaction.
Returns a dict with the parsed operation on success.
Return None on error
NOTE: the transactions that our tools put have a single sender, and a single output address.
This is assumed by this code.
"""
# basic sanity checks
if len(senders) == 0:
raise Exception("No senders given")
# this virtualchain instance must give the 'raw_tx' hint
assert 'raw_tx' in virtualchain_hints, 'BUG: incompatible virtualchain: requires raw_tx support'
# internal sanity check
raw_tx = virtualchain_hints['raw_tx']
btc_tx_data = virtualchain.btc_tx_deserialize(raw_tx)
test_btc_tx = virtualchain.btc_tx_serialize({'ins': inputs, 'outs': outputs, 'locktime': btc_tx_data['locktime'], 'version': btc_tx_data['version']})
assert raw_tx == test_btc_tx, 'TX mismatch: {} != {}'.format(raw_tx, test_btc_tx)
# make sure each op has all the right fields defined
try:
opcode = op_get_opcode_name( op )
opcode = op_get_opcode_name(op)
assert opcode is not None, "Unrecognized opcode '%s'" % op
except Exception, e:
log.exception(e)
@@ -222,28 +206,39 @@ def db_parse( block_id, txid, vtxindex, op, data, senders, inputs, outputs, fee,
log.debug("PARSE %s at (%s, %s): %s" % (opcode, block_id, vtxindex, data.encode('hex')))
# get the data
op = None
op_data = None
try:
op = op_extract( opcode, data, senders, inputs, outputs, block_id, vtxindex, txid )
op_data = op_extract( opcode, data, senders, inputs, outputs, block_id, vtxindex, txid )
except Exception, e:
log.exception(e)
op = None
op_data = None
if op is not None:
if op_data is not None:
try:
assert 'op' in op_data, 'BUG: missing op'
except Exception as e:
log.exception(e)
log.error("BUG: missing op")
os.abort()
original_op_data = copy.deepcopy(op_data)
# propagate tx data
op['vtxindex'] = int(vtxindex)
op['txid'] = str(txid)
op_data['vtxindex'] = int(vtxindex)
op_data['txid'] = str(txid)
op_data['__original_op_data__'] = original_op_data
else:
log.error("Unparseable op '%s'" % opcode)
return op
return op_data
def check_mutate_fields( op, op_data ):
"""
Verify that all mutate fields are present.
Return True if so.
Raise an exception (AssertionError) if not.
"""
mutate_fields = op_get_mutate_fields( op )
@@ -266,6 +261,9 @@ def db_scan_block( block_id, op_list, db_state=None ):
do block-level preprocessing:
* find the state-creation operations we will accept
* make sure there are no collisions.
This modifies op_list, but returns nothing.
This aborts on runtime error.
"""
try:
@@ -275,6 +273,7 @@ def db_scan_block( block_id, op_list, db_state=None ):
log.error("FATAL: no state given")
os.abort()
log.debug("SCAN BEGIN: {} ops at block {}".format(len(op_list), block_id))
checked_ops = []
for op_data in op_list:
@@ -300,7 +299,7 @@ def db_scan_block( block_id, op_list, db_state=None ):
# reject all operations that will collide
db_state.put_collisions( block_id, collisions )
log.debug("SCAN END: {} ops at block {} ({} collisions)".format(len(op_list), block_id, len(collisions)))
def db_check( block_id, new_ops, op, op_data, txid, vtxindex, checked_ops, db_state=None ):
@@ -317,7 +316,8 @@ def db_check( block_id, new_ops, op, op_data, txid, vtxindex, checked_ops, db_st
affected more than once, then the opcode priority rules take effect, and
the lower priority opcodes are rejected.
Return True if it's valid; False if not.
Return True if it's valid
Return False if not.
"""
accept = True
@@ -374,96 +374,104 @@ def db_commit( block_id, op, op_data, txid, vtxindex, db_state=None ):
be fed into virtualchain to translate into a string
to be used to generate this block's consensus hash.
"""
if db_state is not None:
if op_data is not None:
try:
assert 'txid' in op_data, "BUG: No txid given"
assert 'vtxindex' in op_data, "BUG: No vtxindex given"
assert op_data['txid'] == txid, "BUG: txid mismatch"
assert op_data['vtxindex'] == vtxindex, "BUG: vtxindex mismatch"
# opcode = op_get_opcode_name( op_data['op'] )
opcode = op_data.get('opcode', None)
assert opcode in OPCODE_PREORDER_OPS + OPCODE_CREATION_OPS + OPCODE_TRANSITION_OPS + OPCODE_STATELESS_OPS, \
"BUG: uncategorized opcode '%s'" % opcode
except Exception, e:
log.exception(e)
log.error("FATAL: failed to commit operation")
os.abort()
if opcode in OPCODE_STATELESS_OPS:
# state-less operation
return []
else:
op_seq = db_state.commit_operation( op_data, block_id )
return op_seq
else:
# final commit for this block
try:
db_state.commit_finished( block_id )
except Exception, e:
log.exception(e)
log.error("FATAL: failed to commit at block %s" % block_id )
os.abort()
return None
else:
log.error("FATAL: no state engine given")
try:
assert db_state is not None
except:
log.error("FATAL: no state given")
os.abort()
if op_data is not None:
def db_save( block_id, consensus_hash, pending_ops, filename, db_state=None ):
"""
(required by virtualchain state engine)
Save all persistent state to stable storage.
Called once per block.
Return True on success
Return False on failure.
"""
from ..atlas import atlasdb_sync_zonefiles
if db_state is not None:
# sanity checks
try:
# pre-calculate the ops hash for SNV
ops_hash = BlockstackDB.calculate_block_ops_hash( db_state, block_id )
db_state.store_block_ops_hash( block_id, ops_hash )
assert '__original_op_data__' in op_data, 'BUG: no __original_op_data__'
assert 'txid' in op_data, "BUG: No txid given"
assert 'vtxindex' in op_data, "BUG: No vtxindex given"
assert op_data['txid'] == txid, "BUG: txid mismatch"
assert op_data['vtxindex'] == vtxindex, "BUG: vtxindex mismatch"
opcode = op_data.get('opcode', None)
assert opcode in OPCODE_PREORDER_OPS + OPCODE_CREATION_OPS + OPCODE_TRANSITION_OPS + OPCODE_STATELESS_OPS, \
"BUG: uncategorized opcode '%s'" % opcode
except Exception, e:
log.exception(e)
log.error("FATAL: failed to calculate ops hash at block %s" % block_id )
log.error("FATAL: failed to commit operation")
os.abort()
# from db_parse
original_op_data = op_data['__original_op_data__']
del op_data['__original_op_data__']
# save, and get the sequence of committed operations
op_seq = None
if opcode in OPCODE_STATELESS_OPS:
# state-less operation
op_seq = []
else:
op_seq = db_state.commit_operation(original_op_data, op_data, block_id)
return op_seq
else:
# final commit for this block
try:
# flush the database
db_state.commit_finished( block_id )
except Exception, e:
log.exception(e)
log.error("FATAL: failed to commit at block %s" % block_id )
os.abort()
return None
def db_save( block_height, consensus_hash, ops_hash, accepted_ops, virtualchain_ops_hints, db_state=None ):
"""
(required by virtualchain state engine)
Save all persistent state to stable storage.
Called once per block.
In Blockstack's case, we save transactions as we process them.
The only thing to do here is to synchronize the Atlas DB and clean up the
BlockstackDB instance in preparation for receiving the next blocks' transactions.
Return True on success
Return False on failure.
"""
from ..atlas import atlasdb_sync_zonefiles
if db_state is not None:
try:
# flush the database
db_state.commit_finished( block_height )
except Exception, e:
log.exception(e)
log.error("FATAL: failed to commit at block %s" % block_height )
os.abort()
try:
# sync block data to atlas, if enabled
blockstack_opts = get_blockstack_opts()
if blockstack_opts.get('atlas', False):
log.debug("Synchronize Atlas DB for %s" % (block_id-1))
zonefile_dir = blockstack_opts.get('zonefiles', get_zonefile_dir())
if 'zonefiles' not in blockstack_opts:
log.error("No zonefiles directory set in config file. Atlas will be disabled")
elif 'atlasdb_path' not in blockstack_opts:
log.error('No atlasdb_path set in config file. Atlas will be disabled')
else:
log.debug("Synchronize Atlas DB for {}".format(block_height))
zonefile_dir = blockstack_opts['zonefiles']
atlasdb_path = blockstack_opts['atlasdb_path']
gc.collect()
atlasdb_sync_zonefiles( db_state, block_id-1, zonefile_dir=zonefile_dir )
gc.collect()
gc.collect()
atlasdb_sync_zonefiles(db_state, block_height, zonefile_dir, path=atlasdb_path)
gc.collect()
except Exception, e:
log.exception(e)
log.error("FATAL: failed to update Atlas db at %s" % block_id )
log.error("FATAL: failed to update Atlas db at %s" % block_height )
os.abort()
return True
@@ -490,27 +498,22 @@ def db_continue( block_id, consensus_hash ):
return is_running() or os.environ.get("BLOCKSTACK_TEST") == "1"
def sync_blockchain( bt_opts, last_block, expected_snapshots={}, **virtualchain_args ):
def sync_blockchain( working_dir, bt_opts, last_block, expected_snapshots={}, **virtualchain_args ):
"""
synchronize state with the blockchain.
Return True on success
Return False if we're supposed to stop indexing
Abort on error
"""
# make this usable even if we haven't explicitly configured virtualchain
impl = sys.modules[__name__]
if virtualchain.get_implementation() is not None:
impl = None
log.info("Synchronizing database up to block %s" % last_block)
db_filename = virtualchain.get_db_filename(impl=impl)
log.info("Synchronizing database {} up to block {}".format(working_dir, last_block))
# NOTE: this is the only place where a read-write handle should be created,
# since this is the only place where the db should be modified.
new_db = BlockstackDB.borrow_readwrite_instance( db_filename, last_block, expected_snapshots=expected_snapshots )
rc = virtualchain.sync_virtualchain( bt_opts, last_block, new_db, expected_snapshots=expected_snapshots, **virtualchain_args )
BlockstackDB.release_readwrite_instance( new_db, last_block )
new_db = BlockstackDB.borrow_readwrite_instance(working_dir, last_block, expected_snapshots=expected_snapshots)
rc = virtualchain.sync_virtualchain(bt_opts, last_block, new_db, expected_snapshots=expected_snapshots, **virtualchain_args)
BlockstackDB.release_readwrite_instance(new_db, last_block)
return rc