add a fast-path for inserting subdomains that don't reorg their histories due to ownership change (O(1) DB operations instead of O(N) operations for N existing updates). Also, only process a zone file at most once

This commit is contained in:
Jude Nelson
2018-02-09 18:35:10 -05:00
parent bc9807e79a
commit e28438eed3

View File

@@ -265,7 +265,7 @@ class Subdomain(object):
raise ParseError("TXT entry too long for a missing zone file list")
try:
return [int(i) for i in txt_entry.split(',')]
return [int(i) for i in txt_entry.split(',')] if txt_entry is not None and len(txt_entry) > 0 else []
except ValueError:
raise ParseError('Invalid integers')
@@ -553,7 +553,7 @@ class SubdomainIndex(object):
subdomain_index[fqn].append(i)
return {'zonefile_info': subdomain_info, 'subdomains': subdomain_index}
def make_new_subdomain_history(self, cursor, subdomain_rec):
"""
@@ -635,6 +635,103 @@ class SubdomainIndex(object):
return fut
def get_subdomain_history_neighbors(self, cursor, subdomain_rec):
"""
Given a subdomain record, get its neighbors.
I.e. get all of the subdomain records with the previous sequence number,
and get all of the subdomain records with the next sequence number
Returns {'prev': [...blockchain order...], 'cur': [...blockchain order...], 'fut': [...blockchain order...]}
"""
# what's the subdomain's immediate prior history?
hist = self.subdomain_db.get_subdomain_history(subdomain_rec.get_fqn(), include_unaccepted=True, start_sequence=subdomain_rec.n-1, end_sequence=subdomain_rec.n, cur=cursor)
hist.sort(lambda h1, h2: -1 if h1.n < h2.n or (h1.n == h2.n and h1.parent_zonefile_index < h2.parent_zonefile_index) \
else 0 if h1.n == h2.n and h1.parent_zonefile_index == h2.parent_zonefile_index \
else 1)
# what's the subdomain's current and immediate future?
fut = self.subdomain_db.get_subdomain_history(subdomain_rec.get_fqn(), include_unaccepted=True, start_sequence=subdomain_rec.n, end_sequence=subdomain_rec.n+2, cur=cursor)
fut.sort(lambda h1, h2: -1 if h1.n < h2.n or (h1.n == h2.n and h1.parent_zonefile_index < h2.parent_zonefile_index) \
else 0 if h1.n == h2.n and h1.parent_zonefile_index == h2.parent_zonefile_index \
else 1)
# extract the current (conflicting) records from the future
cur = []
tmp_fut = []
for f in fut:
if f.n == subdomain_rec.n:
cur.append(f)
else:
tmp_fut.append(f)
fut = tmp_fut
ret = {'prev': hist, 'cur': cur, 'fut': fut}
print ret
return ret
def subdomain_try_insert(self, cursor, subdomain_rec, history_neighbors):
"""
Try to insert a subdomain record into its history neighbors.
This is an optimization that handles the "usual" case.
We can do this without having to rewrite this subdomain's past and future
if (1) we can find a previously-accepted subdomain record, and (2) the transition
from this subdomain record to a future subdomain record preserves its
acceptance as True. In this case, the "far" past and "far" future are already
consistent.
Return True if we succeed in doing so.
Return False if not.
"""
blockchain_order = history_neighbors['prev'] + history_neighbors['cur'] + history_neighbors['fut']
last_accepted = -1
for i in range(0, len(blockchain_order)):
if blockchain_order[i].accepted:
last_accepted = i
break
if blockchain_order[i].n > subdomain_rec.n or (blockchain_order[i].n == subdomain_rec.n and blockchain_order[i].parent_zonefile_index > subdomain_rec.parent_zonefile_index):
# can't cheaply insert this subdomain record,
# since none of its immediate ancestors are accepted.
log.debug("No immediate ancestors are accepted on {}".format(subdomain_rec))
return False
if last_accepted < 0:
log.debug("No immediate ancestors or successors are accepted on {}".format(subdomain_rec))
return False
# one ancestor was accepted.
# work from there.
chain_tip_status = blockchain_order[-1].accepted
dirty = [] # to be written
for i in range(last_accepted+1, len(blockchain_order)):
cur_accepted = blockchain_order[i].accepted
new_accepted = self.check_subdomain_transition(blockchain_order[last_accepted], blockchain_order[i])
if new_accepted != cur_accepted:
blockchain_order[i].accepted = new_accepted
log.debug("Changed from {} to {}: {}".format(cur_accepted, new_accepted, blockchain_order[i]))
dirty.append(blockchain_order[i])
if new_accepted:
last_accepted = i
if chain_tip_status != blockchain_order[-1].accepted and len(history_neighbors['fut']) > 0:
# deeper reorg
log.debug("Immediate history chain tip altered from {} to {}: {}".format(chain_tip_status, blockchain_order[-1].accepted, blockchain_order[-1]))
return False
# localized change. Just commit the dirty entries
for subrec in dirty:
log.debug("Update to accepted={}: {}".format(subrec.accepted, subrec))
self.subdomain_db.update_subdomain_entry(subrec, cur=cursor)
return True
def process_subdomains(self, zonefile_subdomain_info):
"""
@@ -663,6 +760,7 @@ class SubdomainIndex(object):
self.subdomain_db.update_subdomain_entry(subrec, cur=cursor)
db_query_execute(cursor, 'END', ())
self.subdomain_db.commit()
# at each zone file, find out if its subdomain creates/updates are valid
for subinfo in zonefile_subdomain_info:
@@ -683,6 +781,14 @@ class SubdomainIndex(object):
db_query_execute(cursor, 'BEGIN', ())
for fqn in new_subdomain_recs:
immediate_history = self.get_subdomain_history_neighbors(cursor, new_subdomain_recs[fqn])
inserted = self.subdomain_try_insert(cursor, new_subdomain_recs[fqn], immediate_history)
if inserted:
log.debug("Inserted {}".format(fqn))
continue
log.debug("Rewrite history of {}".format(fqn))
new_hist = self.make_new_subdomain_history(cursor, new_subdomain_recs[fqn])
for subrec in new_hist:
self.subdomain_db.update_subdomain_entry(subrec, cur=cursor)
@@ -702,7 +808,7 @@ class SubdomainIndex(object):
self.subdomain_db.update_subdomain_entry(subrec, cur=cursor)
db_query_execute(cursor, 'END', ())
self.subdomain_db.commit()
def enqueue_zonefile(self, zonefile_hash, block_height):
"""
@@ -790,12 +896,18 @@ class SubdomainIndex(object):
for i in subdomain_index[fqn]:
subdomain_zonefile_infos[fqn].append(zonefile_subdomain_info[i])
processed = []
for fqn in subdomain_zonefile_infos:
log.debug("Processing {} subdomain update(s) found for {}".format(len(subdomain_zonefile_infos[fqn]), fqn))
subseq = filter(lambda szi: szi['zonefile_hash'] not in processed, subdomain_zonefile_infos[fqn])
if len(subseq) == 0:
continue
subdomain_zonefile_infos[fqn].sort(cmp=lambda z1, z2: -1 if z1['block_height'] < z2['block_height'] else 0 if z1['block_height'] == z2['block_height'] else 1)
self.process_subdomains(subdomain_zonefile_infos[fqn])
log.debug("Processing {} zone file entries found for {} and others".format(len(subseq), fqn))
subseq.sort(cmp=lambda z1, z2: -1 if z1['block_height'] < z2['block_height'] else 0 if z1['block_height'] == z2['block_height'] else 1)
self.process_subdomains(subseq)
processed += [szi['zonefile_hash'] for szi in subseq]
# clear queue
queuedb_removeall(self.subdomain_queue_path, all_queued_zfinfos)
@@ -874,6 +986,13 @@ class SubdomainDB(object):
return d
def commit(self):
"""
Commit state
"""
self.conn.commit()
def cursor(self):
"""
@@ -1228,7 +1347,7 @@ class SubdomainDB(object):
except:
pass
known_missing = [int(i) for i in missing_str.split(',')]
known_missing = [int(i) for i in missing_str.split(',')] if missing_str is not None and len(missing_str) > 0 else []
num_missing = atlasdb_get_zonefiles_missing_count_by_name(domain, indexes_exclude=known_missing, path=atlasdb_path)
if num_missing > 0:
log.debug("Subdomain is missing {} zone files: {}".format(num_missing, subrec))
@@ -1261,10 +1380,11 @@ class SubdomainDB(object):
signature TEXT NOT NULL,
block_height INTEGER NOT NULL,
parent_zonefile_hash TEXT NOT NULL,
parent_zonefile_index INTEGER UNIQUE NOT NULL,
txid TEXT PRIMARY KEY,
parent_zonefile_index INTEGER NOT NULL,
txid TEXT NOT NULL,
missing TEXT NOT NULL,
accepted INTEGER NOT NULL);
accepted INTEGER NOT NULL,
PRIMARY KEY(fully_qualified_subdomain,parent_zonefile_index));
""".format(self.subdomain_table)
db_query_execute(cursor, create_cmd, ())