bugfix: reindex a subdomain across different updates in different names. Also, add the ability to reindex from scratch, and add methods to explicitly clear the subdomain db. Also, add subdomain history query methods

This commit is contained in:
Jude Nelson
2018-01-30 17:42:49 -05:00
parent d773ce5bf1
commit d8e769577c

View File

@@ -32,7 +32,7 @@ import blockstack_zones
from virtualchain import bitcoin_blockchain
from .config import BLOCKSTACK_TESTNET, BLOCKSTACK_TEST, BLOCKSTACK_DEBUG, get_blockstack_opts, is_atlas_enabled, is_subdomains_enabled
from .config import BLOCKSTACK_TESTNET, BLOCKSTACK_TEST, BLOCKSTACK_DEBUG, SUBDOMAINS_FIRST_BLOCK, get_blockstack_opts, is_atlas_enabled, is_subdomains_enabled
from .atlas import atlasdb_open, atlasdb_get_zonefiles_by_block, atlasdb_get_zonefiles_by_name, atlas_node_add_callback, atlasdb_query_execute, atlasdb_get_zonefiles_by_hash
from .storage import get_atlas_zonefile_data, get_zonefile_data_hash, store_atlas_zonefile_data
from .scripts import is_name_valid, is_address_subdomain, is_subdomain
@@ -51,8 +51,6 @@ SUBDOMAIN_N = "seqn"
log = virtualchain.get_logger()
SUBDOMAINS_FIRST_BLOCK = 478872
class DomainNotOwned(Exception):
"""
Exception thrown when the stem name is not found
@@ -227,6 +225,7 @@ class Subdomain(object):
'txid': self.txid,
'value_hash': get_zonefile_data_hash(self.zonefile_str),
'zonefile': base64.b64encode(self.zonefile_str),
'name': self.get_fqn(),
}
return ret
@@ -309,18 +308,19 @@ class SubdomainIndex(object):
"""
Process zone files as they arrive for subdomain state, and as instructed by an external caller.
"""
def __init__(self, working_dir, subdomain_db_path):
def __init__(self, subdomain_db_path, blockstack_opts=None):
if blockstack_opts is None:
blockstack_opts = get_blockstack_opts()
blockstack_opts = get_blockstack_opts()
assert is_atlas_enabled(blockstack_opts), 'Cannot start subdomain indexer since Atlas is disabled'
self.subdomain_db_path = subdomain_db_path
self.subdomain_db = SubdomainDB(subdomain_db_path, blockstack_opts['zonefiles'])
self.working_dir = working_dir
self.atlasdb_path = blockstack_opts['atlasdb_path']
self.zonefiles_dir = blockstack_opts['zonefiles']
log.debug("SubdomainIndex: working_dir={}, db={}, atlasdb={}, zonefiles={}".format(working_dir, subdomain_db_path, self.atlasdb_path, self.zonefiles_dir))
log.debug("SubdomainIndex: db={}, atlasdb={}, zonefiles={}".format(subdomain_db_path, self.atlasdb_path, self.zonefiles_dir))
@classmethod
@@ -332,15 +332,15 @@ class SubdomainIndex(object):
Return False if not
"""
if existing_subrec.get_fqn() != new_subrec.get_fqn():
log.warn("Failed subdomain {} transition because fqn changed to {}".format(existing_subrec.get_fqn(), new_subrec.get_fqn()))
log.warn("Failed subdomain {} transition because fqn changed to {} (at block height {} zonefile index {})".format(existing_subrec.get_fqn(), new_subrec.get_fqn(), new_subrec.block_height, new_subrec.zonefile_index))
return False
if existing_subrec.n + 1 != new_subrec.n:
log.warn("Failed subdomain {} transition because of N:{}->{}".format(new_subrec.get_fqn(), existing_subrec.n + 1, new_subrec.n))
log.warn("Failed subdomain {} transition because of N:{}->{} (at block height {} zonefile index {})".format(new_subrec.get_fqn(), existing_subrec.n + 1, new_subrec.n, new_subrec.block_height, new_subrec.zonefile_index))
return False
if not new_subrec.verify_signature(existing_subrec.address):
log.warn("Failed subdomain {} transition because of signature failure".format(new_subrec.get_fqn()))
log.warn("Failed subdomain {} transition because of signature failure (at block height {} zonefile index {})".format(new_subrec.get_fqn(), new_subrec.block_height, new_subrec.zonefile_index))
return False
return True
@@ -371,7 +371,11 @@ class SubdomainIndex(object):
Optionally only finds zone file updates for a specific name
Returns a list of {'name':.., 'zonefile_hash':..., 'block_height':..., 'txid':..., 'subdomains': [...] or None}, in blockchain order
Returns {
'zonefile_info': [{'name':.., 'zonefile_hash':..., 'block_height':..., 'txid':..., 'subdomains': [...] or None}], # in blockchain order
'subdomains': {'fqn': [indexes into zonefile_info]}
}
'subdomains' will map to a list if we had the zone file and were able to parse it.
'subdomains' will map to None if we did not have the zone file, period (means we can't process anything for the given name beyond this point)
"""
@@ -410,10 +414,24 @@ class SubdomainIndex(object):
# have zone file, but no subdomains
subdomains = []
log.debug("Found {} subdomain record(s) for '{}' in zonefile {} at {}".format(len(subdomains), sdinfo['name'], sdinfo['zonefile_hash'], sdinfo['block_height']))
log.debug("Found {} subdomain record(s) for '{}' in zonefile {} at {} (index {})".format(len(subdomains), sdinfo['name'], sdinfo['zonefile_hash'], sdinfo['block_height'], sdinfo['inv_index']))
subdomain_info[i]['subdomains'] = subdomains
return subdomain_info
# group by subdomain name
subdomain_index = {}
for i, zfinfo in enumerate(subdomain_info):
if zfinfo['subdomains'] is None:
# no zone file
continue
for sd in zfinfo['subdomains']:
fqn = sd.get_fqn()
if fqn not in subdomain_index:
subdomain_index[fqn] = []
subdomain_index[fqn].append(i)
return {'zonefile_info': subdomain_info, 'subdomains': subdomain_index}
@classmethod
@@ -566,7 +584,16 @@ class SubdomainIndex(object):
if fqn not in current_state:
try:
existing_subrec = self.subdomain_db.get_subdomain_entry(fqn, cur=cur)
current_state[fqn] = existing_subrec
if subrec.zonefile_index < existing_subrec.zonefile_index:
# we're reorging
log.warning("Possibly reorganizing history for {} at sequence {}".format(fqn, subrec.n))
existing_subrec = self.subdomain_db.get_subdomain_entry_before(fqn, subrec.zonefile_index)
current_state[fqn] = existing_subrec
else:
current_state[fqn] = existing_subrec
except SubdomainNotFound:
current_state[fqn] = None
@@ -635,22 +662,24 @@ class SubdomainIndex(object):
def index_blockchain(self, block_start, block_end):
"""
Go through the sequence of zone files discovered in a block range, and reindex the names' subdomains.
Returns the list of zone file hashes processed
"""
log.debug("Processing subdomain updates for zonefiles in blocks {}-{}".format(block_start, block_end))
zonefile_subdomain_info = self.find_zonefile_subdomains(block_start, block_end)
res = self.find_zonefile_subdomains(block_start, block_end)
zonefile_subdomain_info = res['zonefile_info']
self.process_subdomains(zonefile_subdomain_info)
return list(set([zf['zonefile_hash'] for zf in zonefile_subdomain_info]))
def index_discovered_zonefiles(self, lastblock, skip_zonefiles=[]):
def index_discovered_zonefiles(self, lastblock):
"""
Go through the list of zone files we discovered via Atlas, grouped by name and ordered by block height.
Find all subsequent zone files for this name, and process all subdomain operations contained within them.
Optionally skip zone files if they are in skip_zonefiles
"""
zonefile_infos = {} # cached zone file infos
all_queued_zfinfos = [] # contents of the queue
subdomain_zonefile_infos = {} # map subdomain fqn to list of zonefile info bundles, for process_subdomains
offset = 0
name_blocks = {} # map domain name to the block at which we should reprocess its subsequent zone files
@@ -672,10 +701,6 @@ class SubdomainIndex(object):
zonefile_hash = zfinfo['zonefile_hash']
block_height = zfinfo['block_height']
if zonefile_hash in skip_zonefiles:
log.debug("Skipping zonefile {}".format(zonefile_hash))
continue
if zonefile_hash not in zonefile_infos:
# find out the names that sent this zone file
zfinfos = atlasdb_get_zonefiles_by_hash(zonefile_hash, block_height=block_height, path=self.atlasdb_path)
@@ -694,14 +719,31 @@ class SubdomainIndex(object):
name_blocks[zfi['name']] = block_height
else:
name_blocks[zfi['name']] = min(block_height, name_blocks[zfi['name']])
for name in name_blocks:
if name_blocks[name] >= lastblock:
continue
log.debug("Processing subdomain updates for {} starting at block {}".format(name, name_blocks[name]))
zonefile_subdomain_info = self.find_zonefile_subdomains(name_blocks[name], lastblock, name=name)
self.process_subdomains(zonefile_subdomain_info)
log.debug("Finding subdomain updates for {} starting at block {}".format(name, name_blocks[name]))
res = self.find_zonefile_subdomains(name_blocks[name], lastblock, name=name)
zonefile_subdomain_info = res['zonefile_info']
subdomain_index = res['subdomains']
# for each subdomain, find the list of zonefiles that possibly affected it
for fqn in subdomain_index:
if fqn not in subdomain_zonefile_infos:
subdomain_zonefile_infos[fqn] = []
for i in subdomain_index[fqn]:
subdomain_zonefile_infos[fqn].append(zonefile_subdomain_info[i])
for fqn in subdomain_zonefile_infos:
log.debug("Processing {} subdomain update(s) found for {}".format(len(subdomain_zonefile_infos[fqn]), fqn))
# make sure we give the list of zone files in blockchain order
subdomain_zonefile_infos[fqn].sort(cmp=lambda zf1, zf2: -1 if zf1['inv_index'] < zf2['inv_index'] else 0 if zf1['inv_index'] == zf2['inv_index'] else 1)
self.process_subdomains(subdomain_zonefile_infos[fqn])
# clear queue
queuedb_removeall(self.subdomain_db_path, all_queued_zfinfos)
@@ -714,18 +756,43 @@ class SubdomainIndex(object):
* scan the blockchain from start_block to end_block and make sure we're up-to-date
* process any newly-arrived zone files and re-index the affected subdomains
"""
processed_zonefile_hashes = []
'''
log.debug("BEGIN Processing zonefiles added in the last block")
processed_zonefile_hashes = self.index_blockchain(block_start, block_end)
log.debug("END Processing zonefiles added in the last block")
'''
log.debug("BEGIN Processing zonefiles discovered since last re-indexing")
self.index_discovered_zonefiles(block_end, skip_zonefiles=processed_zonefile_hashes)
self.index_discovered_zonefiles(block_end)
log.debug("END Processing zonefiles discovered since last re-indexing")
@classmethod
def reindex(cls, lastblock, opts=None):
"""
Generate a subdomains db from scratch, using the names db and the atlas db and zone file collection.
Best to do this in a one-off command (i.e. *not* in the blockstackd process)
"""
if opts is None:
opts = get_blockstack_opts()
if not is_atlas_enabled(opts):
raise Exception("Atlas is not enabled")
if not is_subdomains_enabled(opts):
raise Exception("Subdomain support is not enabled")
subdomaindb_path = opts['subdomaindb_path']
atlasdb_path = opts['atlasdb_path']
if not os.path.exists(atlasdb_path):
raise Exception("No Atlas database at {}".format(opts['atlasdb_path']))
subdomain_indexer = SubdomainIndex(subdomaindb_path, blockstack_opts=opts)
subdomain_indexer.subdomain_db.wipe()
start_block = SUBDOMAINS_FIRST_BLOCK
for i in range(start_block, lastblock, 100):
log.debug("Processing all subdomains in blocks {}-{}...".format(i, i+99))
subdomain_indexer.index_blockchain(i, i+100)
log.debug("Finished indexing subdomains in blocks {}-{}".format(start_block, lastblock))
class SubdomainDB(object):
"""
@@ -747,21 +814,12 @@ class SubdomainDB(object):
"""
return self.conn.cursor()
def get_subdomain_entry(self, fqn, cur=None):
def _extract_subdomain(self, fqn, cursor):
"""
Given a fully-qualified subdomain, get its (latest) subdomain record.
Raises SubdomainNotFound if there is no such subdomain
Extract a single subdomain from a DB cursor
Raise SubdomainNotFound if there are no valid rows
"""
get_cmd = "SELECT * FROM {} WHERE fully_qualified_subdomain=? ORDER BY sequence DESC LIMIT 1".format(self.subdomain_table)
cursor = None
if cur is None:
cursor = self.conn.cursor()
else:
cursor = cur
db_query_execute(cursor, get_cmd, (fqn,))
try:
(name, domain, n, encoded_pubkey, zonefile_hash, sig, block_height, zonefile_index, txid) = cursor.fetchone()
except:
@@ -784,6 +842,39 @@ class SubdomainDB(object):
return Subdomain(str(fqn), str(domain), str(encoded_pubkey), int(n), str(zonefile_str), sig, block_height, zonefile_index, txid)
def get_subdomain_entry(self, fqn, cur=None):
"""
Given a fully-qualified subdomain, get its (latest) subdomain record.
Raises SubdomainNotFound if there is no such subdomain
"""
get_cmd = "SELECT * FROM {} WHERE fully_qualified_subdomain=? ORDER BY sequence DESC LIMIT 1;".format(self.subdomain_table)
cursor = None
if cur is None:
cursor = self.conn.cursor()
else:
cursor = cur
db_query_execute(cursor, get_cmd, (fqn,))
return self._extract_subdomain(fqn, cursor)
def get_subdomain_entry_before(self, fqn, zonefile_index, cur=None):
"""
Given a fully-qualified subdoman name and a zonefile index, get the
subdomain entry that was processed just before the index.
Raises SubdomainNotFound if there is no such subdomain
"""
get_cmd = 'SELECT * FROM {} WHERE fully_qualified_subdomain=? AND zonefile_index < ? ORDER BY zonefile_index DESC LIMIT 1;'.format(self.subdomain_table)
cursor = None
if cur is None:
cursor = self.conn.cursor()
else:
cursor = cur
db_query_execute(cursor, get_cmd, (fqn,zonefile_index,))
return self._extract_subdomain(fqn, cursor)
def get_subdomains_owned_by_address(self, owner, cur=None):
"""
Get the list of subdomain names that are owned by a given address.
@@ -843,24 +934,26 @@ class SubdomainDB(object):
if count is not None:
sql += ' LIMIT ?'
args += (count,)
sql += ';'
if cur is None:
cursor = self.conn.cursor()
else:
cursor = cur
rows = db_query_execute(cursor, sql, args)
rowcursor = db_query_execute(cursor, sql, args)
rows = []
for rowdata in rows:
for rowdata in rowcursor:
name, domain, n, encoded_pubkey, zonefile_hash, sig, block_height, zonefile_index, txid = rowdata
rows.append({
'address': encoded_pubkey,
'domain': domain,
'block_number': block_number,
'sequence': n,
'txid': txid,
'value_hash': zonefile_hash,
'address': str(encoded_pubkey),
'domain': str(domain),
'block_number': int(block_height),
'sequence': int(n),
'txid': str(txid),
'value_hash': str(zonefile_hash),
})
if cur is None:
@@ -872,7 +965,7 @@ class SubdomainDB(object):
ret[row['block_number']] = []
ret[row['block_number']].append(row)
return ret
@@ -897,7 +990,7 @@ class SubdomainDB(object):
if not rc:
raise Exception("Failed to store zone file {} from {}".format(zonefile_hash, subdomain_obj.get_fqn()))
write_cmd = 'INSERT OR REPLACE INTO {} VALUES (?,?,?,?,?,?,?,?,?)'.format(self.subdomain_table)
write_cmd = 'INSERT INTO {} VALUES (?,?,?,?,?,?,?,?,?)'.format(self.subdomain_table)
args = (fqn, subdomain_obj.domain, subdomain_obj.n, subdomain_obj.address, zonefile_hash, subdomain_obj.sig, subdomain_obj.block_height, subdomain_obj.zonefile_index, subdomain_obj.txid)
cursor = None
@@ -950,8 +1043,9 @@ class SubdomainDB(object):
Clear the subdomain db's tables
"""
drop_cmd = "DROP TABLE IF EXISTS {};"
cursor = self.conn.cursor()
db_query_execute(cur, drop_cmd.format(self.subdomain_table), ())
for table in [self.subdomain_table, 'queue']:
cursor = self.conn.cursor()
db_query_execute(cursor, drop_cmd.format(table), ())
def _create_tables(self):
@@ -959,15 +1053,15 @@ class SubdomainDB(object):
Set up the subdomain db's tables
"""
create_cmd = """CREATE TABLE IF NOT EXISTS {} (
fully_qualified_subdomain TEXT PRIMARY KEY,
fully_qualified_subdomain TEXT,
domain TEXT NOT NULL,
sequence INTEGER NOT NULL,
owner TEXT NOT NULL,
zonefile_hash TEXT,
signature TEXT,
zonefile_hash TEXT NOT NULL,
signature TEXT NOT NULL,
block_height INTEGER NOT NULL,
zonefile_index INTEGER NOT NULL,
txid TEXT NOT NULL);
txid TEXT PRIMARY KEY);
""".format(self.subdomain_table)
cursor = self.conn.cursor()
@@ -978,6 +1072,15 @@ class SubdomainDB(object):
queue_con.close()
def wipe(self):
"""
Delete all the tables and recreate them
"""
self._drop_tables()
self._create_tables()
def decode_zonefile_subdomains(domain, zonefile_txt, block_height, zonefile_index, txid):
"""
Decode a serialized zone file into a zonefile structure that could contain subdomain info.
@@ -1308,7 +1411,7 @@ def subdomains_init(blockstack_opts, working_dir, atlas_state):
if not is_subdomains_enabled(blockstack_opts):
return None
subdomain_state = SubdomainIndex(working_dir, blockstack_opts['subdomaindb_path'])
subdomain_state = SubdomainIndex(blockstack_opts['subdomaindb_path'], blockstack_opts=blockstack_opts)
atlas_node_add_callback(atlas_state, 'store_zonefile', subdomain_state.enqueue_zonefile)
return subdomain_state