refactoring: move history verification to lib/consensus; specialize

debug output when using the atlas network simulator; use get_records_*
instead of get_nameops_*
This commit is contained in:
Jude Nelson
2016-09-02 00:12:50 -04:00
parent 94fdfaa9d4
commit e93328c09a

View File

@@ -60,14 +60,15 @@ from ConfigParser import SafeConfigParser
import pybitcoin
from lib import nameset as blockstack_state_engine
from lib import get_db_state, invalidate_db_state
from lib.config import REINDEX_FREQUENCY, DEFAULT_DUST_FEE
from lib import get_db_state
from lib.config import REINDEX_FREQUENCY
from lib import *
from lib.storage import *
from .atlas import *
import lib.nameset.virtualchain_hooks as virtualchain_hooks
import lib.config as config
from lib.consensus import *
# global variables, for use with the RPC server
blockstack_opts = None
@@ -307,18 +308,23 @@ class BlockstackdRPCHandler(SimpleXMLRPCRequestHandler):
"client_host": "",
"client_port": 0
}
log.debug("Inbound RPC begin %s(%s) (from atlas simulator)" % ("rpc_" + str(method), params))
else:
log.debug("Inbound RPC begin %s(%s)" % ("rpc_" + str(method), params))
if os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1":
log.debug("Inbound RPC begin %s(%s)" % ("rpc_" + str(method), params))
else:
log.debug("RPC %s(%s)" % ("rpc_" + str(method), params))
res = self.server.funcs["rpc_" + str(method)](*params, **con_info)
# lol jsonrpc within xmlrpc
ret = json.dumps(res)
log.debug("Inbound RPC end %s(%s)" % ("rpc_" + str(method), params))
if os.environ.get("BLOCKSTACK_ATLAS_NETWORK_SIMULATION", None) == "1":
log.debug("Inbound RPC end %s(%s)" % ("rpc_" + str(method), params))
return ret
except Exception, e:
print >> sys.stderr, "\n\n%s(%s)\n%s\n\n" % ("rpc_" + str(method), params, traceback.format_exc())
@@ -441,47 +447,49 @@ class BlockstackdRPC( SimpleXMLRPCServer):
return name_history
def rpc_get_nameops_at( self, block_id, **con_info ):
def rpc_get_records_at( self, block_id, **con_info ):
"""
Get the sequence of names and namespaces altered at the given block.
Get the sequence of name and namespace records at the given block.
Returns the list of name operations to be fed into virtualchain.
Used by SNV clients.
"""
if type(block_id) not in [int, long]:
return {'error': 'invalid block ID'}
db = get_db_state()
db = get_state_engine()
all_ops = db.get_all_nameops_at( block_id )
prior_records = db.get_all_records_at( block_id, include_history=True )
ret = []
for op in all_ops:
restored_op = nameop_restore_consensus_fields( op, block_id )
ret.append( restored_op )
for rec in prior_records:
restored_rec = rec_restore_snv_consensus_fields( rec, block_id )
ret.append( restored_rec )
return ret
def rpc_get_nameops_hash_at( self, block_id, **con_info ):
def rpc_get_nameops_at( self, block_id, **con_info ):
"""
Old name for rpc_get_records_at
"""
return self.rpc_get_records_at( block_id )
def rpc_get_records_hash_at( self, block_id, **con_info ):
"""
Get the hash over the sequence of names and namespaces altered at the given block.
Used by SNV clients.
"""
if type(block_id) not in [int, long]:
return {'error': 'invalid block ID'}
db = get_db_state()
db = get_state_engine()
prior_recs = db.get_all_records_at( block_id, include_history=True )
if prior_recs is None:
prior_recs = []
ops = db.get_all_nameops_at( block_id )
if ops is None:
ops = []
restored_ops = []
for op in ops:
restored_op = nameop_restore_consensus_fields( op, block_id )
restored_ops.append( restored_op )
restored_recs = []
for rec in prior_recs:
restored_rec = rec_restore_snv_consensus_fields( rec, block_id )
restored_recs.append( restored_rec )
# NOTE: extracts only the operation-given fields, and ignores ancilliary record fields
serialized_ops = [ virtualchain.StateEngine.serialize_op( str(op['op'][0]), op, BlockstackDB.make_opfields(), verbose=False ) for op in restored_ops ]
serialized_ops = [ virtualchain.StateEngine.serialize_op( str(op['op'][0]), op, BlockstackDB.make_opfields(), verbose=False ) for op in restored_recs ]
for serialized_op in serialized_ops:
log.debug("SERIALIZED (%s): %s" % (block_id, serialized_op))
@@ -492,6 +500,13 @@ class BlockstackdRPC( SimpleXMLRPCServer):
return ops_hash
def rpc_get_nameops_hash_at( self, block_id, **con_info ):
"""
Old name for rpc_get_records_hash_at
"""
return self.rpc_get_records_hash_at( block_id )
def rpc_getinfo(self, **con_info):
"""
Get the number of blocks the
@@ -1362,7 +1377,7 @@ def stop_server( clean=False, kill=False ):
except Exception, e:
log.exception(e)
sys.exit(1)
os.abort()
if kill:
clean = True
@@ -1489,28 +1504,23 @@ def index_blockchain( expected_snapshots=GENESIS_SNAPSHOT ):
bt_opts = get_bitcoin_opts()
start_block, current_block = get_index_range()
db = get_state_engine()
old_lastblock = db.lastblock
if start_block is None and current_block is None:
log.error("Failed to find block range")
return
db = get_state_engine()
old_lastblock = db.lastblock
# bring us up to speed
# bring the db up to the chain tip
log.debug("Begin indexing (up to %s)" % current_block)
set_indexing( True )
db = get_state_engine()
virtualchain.sync_virtualchain( bt_opts, current_block, db, expected_snapshots=expected_snapshots, tx_filter=blockstack_tx_filter )
virtualchain_hooks.sync_blockchain( bt_opts, current_block, expected_snapshots=expected_snapshots, tx_filter=blockstack_tx_filter )
set_indexing( False )
# invalidate in-RAM copy, and reload eagerly
invalidate_db_state()
get_state_engine()
db = get_db_state()
# synchronize atlas db
blockstack_opts = get_blockstack_opts()
if blockstack_opts.get('atlas', False):
db = get_state_engine()
if old_lastblock < db.lastblock:
log.debug("Synchronize Atlas DB from %s to %s" % (old_lastblock+1, db.lastblock+1))
zonefile_dir = blockstack_opts.get('zonefiles', get_zonefile_dir())
@@ -1774,448 +1784,6 @@ def clean( confirm=True ):
sys.exit(exit_status)
def rec_to_virtualchain_op( name_rec, block_number, history_index, untrusted_db ):
"""
Given a record from the blockstack database,
convert it into a virtualchain operation to
process.
"""
# apply opcodes so we can consume them with virtualchain
opcode_name = str(name_rec['opcode'])
ret_op = {}
if name_rec.has_key('expired') and name_rec['expired']:
# don't care
return None
if opcode_name == "NAME_PREORDER":
name_rec_script = build_preorder( None, None, None, str(name_rec['consensus_hash']), name_hash=str(name_rec['preorder_name_hash']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_preorder( name_rec_payload )
elif opcode_name == "NAME_REGISTRATION":
name_rec_script = build_registration( str(name_rec['name']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_registration( name_rec_payload )
# reconstruct the registration op...
ret_op['recipient'] = str(name_rec['sender'])
ret_op['recipient_address'] = str(name_rec['address'])
# restore history to find prevoius sender and address
untrusted_name_rec = untrusted_db.get_name( str(name_rec['name']) )
name_rec['history'] = untrusted_name_rec['history']
if history_index > 0:
print "restore from %s" % block_number
name_rec_prev = BlockstackDB.restore_from_history( name_rec, block_number )[ history_index - 1 ]
else:
print "restore from %s" % (block_number - 1)
name_rec_prev = BlockstackDB.restore_from_history( name_rec, block_number - 1 )[ history_index - 1 ]
sender = name_rec_prev['sender']
address = name_rec_prev['address']
ret_op['sender'] = sender
ret_op['address'] = address
del name_rec['history']
elif opcode_name == "NAME_UPDATE":
data_hash = None
if name_rec['value_hash'] is not None:
data_hash = str(name_rec['value_hash'])
name_rec_script = build_update( str(name_rec['name']), str(name_rec['consensus_hash']), data_hash=data_hash )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_update(name_rec_payload)
elif opcode_name == "NAME_TRANSFER":
# reconstruct the transfer op...
KEEPDATA_OP = "%s%s" % (NAME_TRANSFER, TRANSFER_KEEP_DATA)
if name_rec['op'] == KEEPDATA_OP:
name_rec['keep_data'] = True
else:
name_rec['keep_data'] = False
# what was the previous owner?
recipient = str(name_rec['sender'])
recipient_address = str(name_rec['address'])
# restore history
untrusted_name_rec = untrusted_db.get_name( str(name_rec['name']) )
name_rec['history'] = untrusted_name_rec['history']
prev_block_number = None
prev_history_index = None
# get previous owner
if history_index > 0:
name_rec_prev = BlockstackDB.restore_from_history( name_rec, block_number )[history_index - 1]
prev_block_number = block_number
prev_history_index = history_index-1
else:
name_rec_prev = BlockstackDB.restore_from_history( name_rec, block_number - 1 )[history_index - 1]
prev_block_number = block_number-1
prev_history_index = history_index-1
if 'transfer_send_block_id' not in name_rec:
log.error("FATAL: Obsolete or invalid database. Missing 'transfer_send_block_id' field for NAME_TRANSFER at (%s, %s)" % (block_number, history_index))
os.abort()
sender = name_rec_prev['sender']
address = name_rec_prev['address']
send_block_id = name_rec['transfer_send_block_id']
# reconstruct recipient and sender
name_rec['recipient'] = recipient
name_rec['recipient_address'] = recipient_address
name_rec['sender'] = sender
name_rec['address'] = address
name_rec['consensus_hash'] = untrusted_db.get_consensus_at( send_block_id )
name_rec_script = build_transfer( str(name_rec['name']), name_rec['keep_data'], str(name_rec['consensus_hash']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_transfer(name_rec_payload, name_rec['recipient'] )
del name_rec['history']
elif opcode_name == "NAME_REVOKE":
name_rec_script = build_revoke( str(name_rec['name']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_revoke( name_rec_payload )
elif opcode_name == "NAME_IMPORT":
name_rec_script = build_name_import( str(name_rec['name']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
# reconstruct recipient and importer
name_rec['recipient'] = str(name_rec['sender'])
name_rec['recipient_address'] = str(name_rec['address'])
name_rec['sender'] = str(name_rec['importer'])
name_rec['address'] = str(name_rec['importer_address'])
ret_op = parse_name_import( name_rec_payload, str(name_rec['recipient']), str(name_rec['value_hash']) )
elif opcode_name == "NAMESPACE_PREORDER":
name_rec_script = build_namespace_preorder( None, None, None, str(name_rec['consensus_hash']), namespace_id_hash=str(name_rec['namespace_id_hash']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_namespace_preorder(name_rec_payload)
elif opcode_name == "NAMESPACE_REVEAL":
name_rec_script = build_namespace_reveal( str(name_rec['namespace_id']), name_rec['version'], str(name_rec['recipient_address']), \
name_rec['lifetime'], name_rec['coeff'], name_rec['base'], name_rec['buckets'],
name_rec['nonalpha_discount'], name_rec['no_vowel_discount'] )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_namespace_reveal( name_rec_payload, str(name_rec['sender']), str(name_rec['recipient_address']) )
elif opcode_name == "NAMESPACE_READY":
name_rec_script = build_namespace_ready( str(name_rec['namespace_id']) )
name_rec_payload = binascii.unhexlify( name_rec_script )[3:]
ret_op = parse_namespace_ready( name_rec_payload )
ret_op = virtualchain.virtualchain_set_opfields( ret_op, virtualchain_opcode=getattr( config, opcode_name ), virtualchain_txid=str(name_rec['txid']), virtualchain_txindex=int(name_rec['vtxindex']) )
ret_op['opcode'] = opcode_name
merged_ret_op = copy.deepcopy( name_rec )
merged_ret_op.update( ret_op )
return merged_ret_op
def find_last_transfer_consensus_hash( name_rec, block_id, vtxindex ):
"""
Given a name record, find the last non-NAME_TRANSFER consensus hash.
Return None if not found.
"""
history_keys = name_rec['history'].keys()
history_keys.sort()
history_keys.reverse()
for hk in history_keys:
if hk > block_id:
continue
history_states = BlockstackDB.restore_from_history( name_rec, hk )
for history_state in reversed(history_states):
if hk == block_id and history_state['vtxindex'] > vtxindex:
# from the future
continue
if history_state['op'][0] == NAME_TRANSFER:
# skip NAME_TRANSFERS
continue
if history_state['op'][0] in [NAME_IMPORT, NAME_REGISTRATION]:
# out of history
return None
if history_state.has_key('consensus_hash') and history_state['consensus_hash'] is not None:
return history_state['consensus_hash']
return None
def nameop_restore_consensus_fields( name_rec, block_id ):
"""
Given a nameop at a point in time, ensure
that all of its consensus fields are present.
Because they can be reconstructed directly from the nameop,
but they are not always stored in the db.
"""
opcode_name = str(name_rec['opcode'])
ret_op = {}
if opcode_name == "NAME_REGISTRATION":
# reconstruct the recipient information
ret_op['recipient'] = str(name_rec['sender'])
ret_op['recipient_address'] = str(name_rec['address'])
elif opcode_name == "NAME_IMPORT":
# reconstruct the recipient information
ret_op['recipient'] = str(name_rec['sender'])
ret_op['recipient_address'] = str(name_rec['address'])
elif opcode_name == "NAME_TRANSFER":
db = get_state_engine()
if 'transfer_send_block_id' not in name_rec:
log.error("FATAL: Obsolete or invalid database. Missing 'transfer_send_block_id' field for NAME_TRANSFER at (%s, %s)" % (prev_block_number, prev_history_index))
os.abort()
full_rec = db.get_name( name_rec['name'], include_expired=True )
full_history = full_rec['history']
# reconstruct the recipient information
ret_op['recipient'] = str(name_rec['sender'])
ret_op['recipient_address'] = str(name_rec['address'])
# reconstruct name_hash, consensus_hash, keep_data
keep_data = None
if name_rec['op'][-1] == TRANSFER_KEEP_DATA:
keep_data = True
else:
keep_data = False
old_history = name_rec.get('history', None)
name_rec['history'] = full_history
consensus_hash = find_last_transfer_consensus_hash( name_rec, block_id, name_rec['vtxindex'] )
name_rec['history'] = old_history
ret_op['keep_data'] = keep_data
if consensus_hash is not None:
print "restore consensus hash (%s,%s): %s" % (block_id, name_rec['vtxindex'], consensus_hash)
ret_op['consensus_hash'] = consensus_hash
else:
ret_op['consensus_hash'] = db.get_consensus_at( name_rec['transfer_send_block_id'] )
print "Use consensus hash from %s: %s" % (name_rec['transfer_send_block_id'], ret_op['consensus_hash'])
ret_op['name_hash'] = hash256_trunc128( str(name_rec['name']) )
elif opcode_name == "NAME_UPDATE":
# reconstruct name_hash
ret_op['name_hash'] = hash256_trunc128( str(name_rec['name']) + str(name_rec['consensus_hash']) )
elif opcode_name == "NAME_REVOKE":
ret_op['revoked'] = True
ret_op = virtualchain.virtualchain_set_opfields( ret_op, virtualchain_opcode=getattr( config, opcode_name ), virtualchain_txid=str(name_rec['txid']), virtualchain_txindex=int(name_rec['vtxindex']) )
ret_op['opcode'] = opcode_name
merged_op = copy.deepcopy( name_rec )
merged_op.update( ret_op )
if 'name_hash' in merged_op.keys():
nh = merged_op['name_hash']
merged_op['name_hash128'] = nh
return merged_op
def block_to_virtualchain_ops( block_id, db ):
"""
convert a block's name ops to virtualchain ops.
This is needed in order to recreate the virtualchain
transactions that generated the block's name operations,
such as for re-building the db or serving SNV clients.
Returns the list of virtualchain ops.
"""
# all sequences of operations at this block, in tx order
nameops = db.get_all_nameops_at( block_id )
virtualchain_ops = []
# process nameops in order by vtxindex
nameops = sorted( nameops, key=lambda op: op['vtxindex'] )
# each name record has its own history, and their interleaving in tx order
# is what makes up nameops. However, when restoring a name record to
# a previous state, we need to know the *relative* order of operations
# that changed it during this block. This is called the history index,
# and it maps names to a dict, which maps the the virtual tx index (vtxindex)
# to integer h such that nameops[name][vtxindex] is the hth update to the name
# record.
history_index = {}
for i in xrange(0, len(nameops)):
nameop = nameops[i]
if 'name' not in nameop.keys():
continue
name = str(nameop['name'])
if name not in history_index.keys():
history_index[name] = { i: 0 }
else:
history_index[name][i] = max( history_index[name].values() ) + 1
for i in xrange(0, len(nameops)):
# only trusted fields
opcode_name = nameops[i]['opcode']
consensus_fields = SERIALIZE_FIELDS.get( opcode_name, None )
if consensus_fields is None:
raise Exception("BUG: no consensus fields defined for '%s'" % opcode_name )
# coerce string, not unicode
for k in nameops[i].keys():
if type(nameops[i][k]) == unicode:
nameops[i][k] = str(nameops[i][k])
# remove virtualchain-specific fields--they won't be trusted
nameops[i] = db.sanitize_op( nameops[i] )
for field in nameops[i].keys():
# remove untrusted fields, except for:
# * 'opcode' (which will be fed into the consensus hash
# indirectly, once the fields are successfully processed and thus proven consistent with
# the fields),
# * 'transfer_send_block_id' (which will be used to find the NAME_TRANSFER consensus hash,
# thus indirectly feeding this information into the consensus hash as well).
if field not in consensus_fields and field not in ['opcode', 'transfer_send_block_id']:
log.warning("OP '%s': Removing untrusted field '%s'" % (opcode_name, field))
del nameops[i][field]
try:
# recover virtualchain op from name record
h = 0
if 'name' in nameops[i]:
if nameops[i]['name'] in history_index:
h = history_index[ nameops[i]['name'] ][i]
virtualchain_op = rec_to_virtualchain_op( nameops[i], block_id, h, db )
except:
print json.dumps( nameops[i], indent=4 )
raise
if virtualchain_op is not None:
virtualchain_ops.append( virtualchain_op )
return virtualchain_ops
def rebuild_database( target_block_id, untrusted_db_path, working_db_path=None, resume_dir=None, start_block=None ):
"""
Given a target block ID and a path to an (untrusted) db, reconstruct it in a temporary directory by
replaying all the nameops it contains.
Return the consensus hash calculated at the target block.
"""
# reconfigure the virtualchain to use a temporary directory,
# so we don't interfere with this instance's primary database
working_dir = None
if resume_dir is None:
working_dir = tempfile.mkdtemp( prefix='blockstack-verify-database-' )
else:
working_dir = resume_dir
blockstack_state_engine.working_dir = working_dir
virtualchain.setup_virtualchain( blockstack_state_engine )
if resume_dir is None:
# not resuming
start_block = virtualchain.get_first_block_id()
else:
# resuming
old_start_block = start_block
start_block = get_lastblock()
if start_block is None:
start_block = old_start_block
log.debug( "Rebuilding database from %s to %s" % (start_block, target_block_id) )
# feed in operations, block by block, from the untrusted database
untrusted_db = BlockstackDB( untrusted_db_path )
# working db, to build up the operations in the untrusted db block-by-block
working_db = None
if working_db_path is None:
working_db_path = virtualchain.get_db_filename()
working_db = BlockstackDB( working_db_path )
log.debug( "Working DB: %s" % working_db_path )
log.debug( "Untrusted DB: %s" % untrusted_db_path )
# map block ID to consensus hashes
consensus_hashes = {}
for block_id in xrange( start_block, target_block_id+1 ):
virtualchain_ops = block_to_virtualchain_ops( block_id, untrusted_db )
# feed ops to virtualchain to reconstruct the db at this block
consensus_hash = working_db.process_block( block_id, virtualchain_ops )
log.debug("VERIFY CONSENSUS(%s): %s" % (block_id, consensus_hash))
consensus_hashes[block_id] = consensus_hash
# final consensus hash
return consensus_hashes[ target_block_id ]
def verify_database( trusted_consensus_hash, consensus_block_id, untrusted_db_path, working_db_path=None, start_block=None ):
"""
Verify that a database is consistent with a
known-good consensus hash.
This algorithm works by creating a new database,
parsing the untrusted database, and feeding the untrusted
operations into the new database block-by-block. If we
derive the same consensus hash, then we can trust the
database.
"""
final_consensus_hash = rebuild_database( consensus_block_id, untrusted_db_path, working_db_path=working_db_path, start_block=start_block )
# did we reach the consensus hash we expected?
if final_consensus_hash == trusted_consensus_hash:
return True
else:
log.error("Unverifiable database state stored in '%s'" % blockstack_state_engine.working_dir )
return False
def restore( working_dir, block_number ):
"""
Restore the database from a backup in the backups/ directory.
@@ -2288,7 +1856,7 @@ def run_blockstackd():
argparser = setup( working_dir=working_dir, return_parser=True )
if argparser is None:
# fatal error
sys.exit(1)
os.abort()
# get RPC server options
subparsers = argparser.add_subparsers(