Refactor the db to make expensive queries pageable, so we don't

overwhelm the node with one bad RPC.  In particular, make
it so the caller can page through the list of operations that occurred
at a particular block (so the caller can fetch them in small batches),
and make it possible to query the history table directly.
This commit is contained in:
Jude Nelson
2016-10-24 03:18:47 -04:00
parent 002078c57e
commit 432d70d57b

View File

@@ -143,6 +143,7 @@ CREATE TABLE name_records( name STRING NOT NULL,
importer_address TEXT,
consensus_hash TEXT,
transfer_send_block_id INT,
last_creation_op STRING NOT NULL,
-- primary key includes block number, so an expired name can be re-registered
PRIMARY KEY(name,block_number),
@@ -152,6 +153,11 @@ CREATE TABLE name_records( name STRING NOT NULL,
);
"""
BLOCKSTACK_DB_SCRIPT += """
CREATE TABLE ops_hashes( block_id INTEGER PRIMARY KEY NOT NULL,
ops_hash STRING NOT NULL );
"""
BLOCKSTACK_DB_SCRIPT += """
CREATE INDEX hash_names_index ON name_records( name_hash128, name );
"""
@@ -199,6 +205,7 @@ def namedb_open( path ):
# add user-defined functions
con.create_function("namespace_lifetime_multiplier", 2, namedb_get_namespace_lifetime_multiplier)
return con
@@ -1266,6 +1273,46 @@ def namedb_get_blocks_with_ops( cur, history_id, start_block_id, end_block_id ):
return ret
def namedb_get_history_rows( cur, history_id, offset=None, count=None ):
"""
Get the history for a name or namespace from the history table.
Use offset/count if given.
"""
ret = []
select_query = "SELECT * FROM history WHERE history_id = ? ORDER BY block_id, vtxindex ASC"
args = (history_id,)
if count is not None:
select_query += " LIMIT ?"
args += (count,)
if offset is not None:
select_query += " OFFSET ?"
args += (offset,)
select_query += ";"
history_rows = namedb_query_execute( cur, select_query, args)
for r in history_rows:
rd = dict(r)
ret.append(rd)
return ret
def namedb_get_num_history_rows( cur, history_id ):
"""
Get the history for a name or namespace from the history table.
Use offset/count if given.
"""
ret = []
select_query = "SELECT COUNT(*) FROM history WHERE history_id = ? ORDER BY block_id, vtxindex ASC;"
args = (history_id,)
count = namedb_select_count_rows( cur, select_query, args )
return count
def namedb_get_history( cur, history_id ):
"""
Get all of the history for a name or namespace.
@@ -1273,14 +1320,14 @@ def namedb_get_history( cur, history_id ):
"""
# get history in increasing order by block_id and then vtxindex
select_query = "SELECT * FROM history WHERE history_id = ? ORDER BY block_id, vtxindex ASC;"
history_rows = namedb_query_execute( cur, select_query, (history_id,) )
history_rows = namedb_get_history_rows( cur, history_id )
return namedb_history_extract( history_rows )
def namedb_history_extract( history_rows ):
"""
TODO: DRY up; moved to client
Given the rows of history for a name, collapse
them into a history dictionary.
Return a dict of:
@@ -1551,6 +1598,8 @@ def namedb_get_names_owned_by_address( cur, address, current_block ):
def namedb_restore_from_history( name_rec, block_id ):
"""
TODO: DRY UP; moved to client
Given a name or a namespace record, replay its
history diffs "back in time" to a particular block
number.
@@ -1563,7 +1612,9 @@ def namedb_restore_from_history( name_rec, block_id ):
The returned records will *not* have a 'history' key.
"""
return blockstack_client.operations.nameop_restore_from_history( name_rec, name_rec['history'], block_id )
'''
block_history = list( reversed( sorted( name_rec['history'].keys() ) ) )
historical_rec = copy.deepcopy( name_rec )
@@ -1655,7 +1706,7 @@ def namedb_restore_from_history( name_rec, block_id ):
updates.append( copy.deepcopy(historical_rec) )
return list( reversed( updates ) )
'''
def namedb_rec_restore( db, rows, history_id_key, block_id, include_history=False ):
@@ -1688,43 +1739,179 @@ def namedb_rec_restore( db, rows, history_id_key, block_id, include_history=Fals
return ret
def namedb_get_all_records_at( db, block_id, include_history=False ):
def namedb_offset_count_predicate( offset=None, count=None ):
"""
Get the states that each name and namespace record
passed through in the given block.
Make an offset/count predicate
even if offset=None or count=None.
Return the list of prior record states, ordered by vtxindex.
Return (query, args)
"""
ret = []
offset_count_query = ""
offset_count_args = ()
if count is not None:
offset_count_query += "LIMIT ? "
offset_count_args += (count,)
if count is not None and offset is not None:
offset_count_query += "OFFSET ? "
offset_count_args += (offset,)
return (offset_count_query, offset_count_args)
def namedb_select_count_rows( cur, query, args ):
"""
Execute a SELECT COUNT(*) ... query
and return the number of rows.
"""
count_rows = namedb_query_execute( cur, query, args )
count = 0
for r in count_rows:
count = r['COUNT(*)']
break
return count
def namedb_get_names_preordered_or_imported_at( db, block_id, include_history=False, offset=None, count=None, restore_history=True ):
"""
Get the list of names preordered or imported at this block height.
Return either the list of rows on success.
If offset is not None, and the offset exceeds the number of rows,
return the number of rows instead.
Note that offset/count affect db queries, not history restorations.
If restore_history is True, then offset/count cannot be set.
"""
assert not (restore_history and (offset is not None or count is not None)), "restore_history is incompatible with offset/length"
if offset is not None:
# how many name records preordered for the first time at this block?
cur = db.cursor()
name_preorder_rows_count_query = "SELECT COUNT(*) FROM name_records " + \
"WHERE (name_records.block_number = ? OR name_records.preorder_block_number = ?);"
args = (block_id,block_id)
num_rows = namedb_select_count_rows( cur, name_preorder_rows_count_query, args )
if num_rows < offset:
log.debug("%s name-preorder states at %s" % (num_rows, block_id ))
return num_rows
# all name records preordered for the first time at this block
cur = db.cursor()
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
name_preorder_rows_query = "SELECT * FROM name_records " + \
"WHERE (name_records.block_number = ? OR name_records.preorder_block_number = ?);"
"WHERE (name_records.block_number = ? OR name_records.preorder_block_number = ?) " + offset_count_query + ";"
args = (block_id,block_id) + offset_count_args
name_preorder_rows = namedb_query_execute( cur, name_preorder_rows_query, (block_id,block_id))
# log.debug(namedb_format_query(name_preorder_rows_query, args))
restored_recs = namedb_rec_restore( db, name_preorder_rows, "name", block_id, include_history=include_history )
ret += restored_recs
log.debug("%s name-preorder states at %s" % (len(restored_recs), block_id ))
name_preorder_rows = namedb_query_execute( cur, name_preorder_rows_query, args )
# all name records affected by this block (including re-preorders, but excluding initial preorders)
if restore_history:
restored_recs = namedb_rec_restore( db, name_preorder_rows, "name", block_id, include_history=include_history )
else:
restored_recs = [dict(r) for r in name_preorder_rows]
# keep only the preorders if we're returning fully-restored records (in case register happens in the same block).
# otherwise, return all of them.
restored_recs = filter( lambda rec: (not restore_history or (rec['op'] == NAME_PREORDER or rec['op'] == NAME_IMPORT)), restored_recs )
log.debug("%s name-preorder/import states at %s" % (len(restored_recs), block_id ))
return restored_recs
def namedb_get_names_modified_at( db, block_id, include_history=False, offset=None, count=None, restore_history=True ):
"""
Get the list of name-modification operations that occurred at the given block height.
Return the list of name operations on success.
If offset is not None, and offset exceeds the number of rows,
then return the number of rows instead.
Note that offset/count affect db queries, not history restorations.
If restore_history is True, then offset/count cannot be set.
"""
assert not (restore_history and (offset is not None or count is not None)), "restore_history is incompatible with offset/length"
if offset is not None:
# how many name records affected by this block?
cur = db.cursor()
name_rows_count_query = "SELECT name_records.name FROM name_records JOIN history ON name_records.name = history.history_id " + \
"WHERE name_records.block_number < ? AND name_records.preorder_block_number != ? AND history.block_id = ? " + \
"GROUP BY name_records.name;"
args = (block_id, block_id, block_id)
name_rows = namedb_query_execute( cur, name_rows_count_query, args )
num_rows = 0
for r in name_rows:
num_rows += 1
if num_rows < offset:
log.debug("%s name-change states at %s" % (num_rows, block_id))
return num_rows
# all name records affected at this height
cur = db.cursor()
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
name_rows_query = "SELECT name_records.* FROM name_records JOIN history ON name_records.name = history.history_id " + \
"WHERE name_records.block_number < ? AND name_records.preorder_block_number != ? AND history.block_id = ? GROUP BY name_records.name;"
"WHERE name_records.block_number < ? AND name_records.preorder_block_number != ? AND history.block_id = ? " + \
"GROUP BY name_records.name " + offset_count_query + ";"
name_rows = namedb_query_execute( cur, name_rows_query, (block_id,block_id,block_id))
args = (block_id, block_id, block_id) + offset_count_args
# log.debug(namedb_format_query(name_rows_query, args))
name_rows = namedb_query_execute( cur, name_rows_query, args )
if restore_history:
restored_recs = namedb_rec_restore( db, name_rows, "name", block_id, include_history=include_history )
else:
restored_recs = [dict(r) for r in name_rows]
restored_recs = namedb_rec_restore( db, name_rows, "name", block_id, include_history=include_history )
ret += restored_recs
log.debug("%s name-change states at %s" % (len(restored_recs), block_id ))
return restored_recs
def namedb_get_preorders_at( db, block_id, offset=None, count=None ):
"""
Get the list of outstanding preorders at this block height.
Return a list of preorders from the preorders table.
If offset is not None, and the offset exceeds the number of preorders,
then return the number of rows instead.
"""
ret = []
if offset is not None:
# how many preorders at this block?
cur = db.cursor()
preorder_rows_count_query = "SELECT COUNT(*) FROM preorders WHERE block_number = ?;"
args = (block_id,)
num_rows = namedb_select_count_rows( cur, preorder_rows_count_query, args )
if num_rows < offset:
log.debug("%s preorders created at %s" % (num_rows, block_id))
return num_rows
# all outstanding name/namespace preorders created at this block
cur = db.cursor()
preorder_rows_query = "SELECT * FROM preorders WHERE block_number = ?;"
preorder_rows = namedb_query_execute( cur, preorder_rows_query, (block_id,))
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
preorder_rows_query = "SELECT * FROM preorders WHERE block_number = ? " + " " + offset_count_query + ";"
args = (block_id,) + offset_count_args
# log.debug(namedb_format_query(preorder_rows_query, args))
preorder_rows = namedb_query_execute( cur, preorder_rows_query, args )
cnt = 0
for preorder in preorder_rows:
@@ -1736,31 +1923,234 @@ def namedb_get_all_records_at( db, block_id, include_history=False ):
cnt += 1
log.debug("%s preorders created at %s" % (cnt, block_id))
return ret
def namedb_get_namespaces_preordered_at( db, block_id, include_history=False, offset=None, count=None, restore_history=True ):
"""
Get the namespace preorders that have occurred at the given block height.
Return the list of namespace preorders from this block.
If offset is not None, and the offset exceeds the number of preorders,
then return the number of rows instead
Note that offset/count affect db queries, not history restorations.
If restore_history is True, then offset/count cannot be set.
"""
assert not (restore_history and (offset is not None or count is not None)), "restore_history is incompatible with offset/length"
if offset is not None:
# how many namespace preorders in this block?
cur = db.cursor()
namespace_preorder_rows_count_query = "SELECT COUNT(*) FROM namespaces WHERE namespaces.block_number = ?;"
args = (block_id,)
num_rows = namedb_select_count_rows( cur, namespace_preorder_rows_count_query, args )
if num_rows < offset:
log.debug("%s namespace-preorders at at %s" % (num_rows, block_id))
return num_rows
cur = db.cursor()
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
namespace_preorder_rows_query = "SELECT * FROM namespaces WHERE namespaces.block_number = ? " + offset_count_query + ";"
args = (block_id,) + offset_count_args
# log.debug(namedb_format_query(namespace_preorder_rows_query, args))
namespace_preorder_rows = namedb_query_execute( cur, namespace_preorder_rows_query, args )
if restore_history:
restored_recs = namedb_rec_restore( db, namespace_preorder_rows, "namespace_id", block_id, include_history=include_history )
else:
restored_recs = [dict(r) for r in namespace_preorder_rows]
log.debug("%s namespace-preorder states at %s" % (len(restored_recs), block_id ))
return restored_recs
def namedb_get_namespaces_modified_at( db, block_id, include_history=False, offset=None, count=None, restore_history=True ):
"""
Get the namespace operations that occurred at the given blocok height.
Return the list of namespace operations from this block.
If offset is not None, and the offset exceeds the number of preorders,
then return the number of rows instead.
Note that offset/count affect db queries, not history restorations.
If restore_history is True, then offset/count cannot be set.
"""
assert not (restore_history and (offset is not None or count is not None)), "restore_history is incompatible with offset/length"
if offset is not None:
# how many namespaces modified in this block?
cur = db.cursor()
namespace_rows_query = "SELECT COUNT(*) FROM namespaces JOIN history ON namespaces.namespace_id = history.history_id " + \
"WHERE namespaces.block_number <= ? AND history.block_id = ? AND (namespaces.op = ? OR namespaces.op = ?);"
args = (block_id, block_id, NAMESPACE_REVEAL, NAMESPACE_READY)
num_rows = namedb_select_count_rows( cur, namespace_rows_query, args )
if num_rows < offset:
log.debug("%s namespace-change states at %s" % (num_rows, block_id))
return num_rows
cur = db.cursor()
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
namespace_rows_query = "SELECT namespaces.* FROM namespaces JOIN history ON namespaces.namespace_id = history.history_id " + \
"WHERE namespaces.block_number <= ? AND history.block_id = ? AND (namespaces.op = ? OR namespaces.op = ?) " + offset_count_query + ";"
args = (block_id, block_id, NAMESPACE_REVEAL, NAMESPACE_READY) + offset_count_args
# log.debug(namedb_format_query(namespace_rows_query, args))
namespace_rows = namedb_query_execute( cur, namespace_rows_query, args )
if restore_history:
restored_recs = namedb_rec_restore( db, namespace_rows, "namespace_id", block_id, include_history=include_history )
else:
restored_recs = [dict(r) for r in namespace_rows]
log.debug("%s namespace-change states at %s" % (len(restored_recs), block_id ))
return restored_recs
def namedb_get_all_ops_countdown( restored_recs, rel_offset, remaining ):
"""
Given the result of a query, update rel_offset and remaining
to reflect the relative offset of the next query and the number
of rows to actually fetch.
Return (list of records, new relative offset, new remaining count)
"""
if remaining is None:
# fetching all recs
return (restored_recs, rel_offset, remaining)
rows = []
if type(restored_recs) in [int, long]:
# skipped all rows in this query
rel_offset -= restored_recs
else:
# got data
rel_offset = 0
# overshoot?
if len(restored_recs) > remaining:
restored_recs = restored_recs[:remaining]
remaining -= len(restored_recs)
rows = restored_recs
log.debug("%s rows, rel_offset = %s, remaining = %s" % (len(rows), rel_offset, remaining))
return (rows, rel_offset, remaining)
def namedb_get_all_ops_at( db, block_id, offset=None, count=None, include_history=False, restore_history=True ):
"""
Get the states that each name and namespace record
passed through in the given block.
Return the list of prior record states, ordered by vtxindex.
If offset is not None (i.e. we're paging through all operations),
then the list will instead be ordered by:
* name preorders that have been claimed but were created at this block
* name operations that happened at this block
* outstanding preorders that were written at this block
* namespaces that were preordered at this block
* namespaces that were modified at this block
If we're paging, then we won't restore the name records to their historical points in time.
This is an anti-DDoS measure. Honest clients will need to fetch the name/namespace history
separately (paginated) and re-assemble the history and current record state into the historic
state client-side.
No ordering within these lists is guaranteed during pagination.
"""
assert not (restore_history and (offset is not None or count is not None)), "Invalid arguments: restore_history is incompatible with pagination"
ret = []
rel_offset = offset
remaining = count
# all name records preordered or imported for the first time at this block
res = namedb_get_names_preordered_or_imported_at( db, block_id, offset=rel_offset, count=remaining, include_history=include_history, restore_history=restore_history )
restored_recs, rel_offset, remaining = namedb_get_all_ops_countdown( res, rel_offset, remaining )
ret += restored_recs
if remaining is not None and remaining <= 0:
return ret
# all name records affected by this block
res = namedb_get_names_modified_at( db, block_id, offset=rel_offset, count=remaining, include_history=include_history, restore_history=restore_history )
restored_recs, rel_offset, remaining = namedb_get_all_ops_countdown( res, rel_offset, remaining )
ret += restored_recs
if remaining is not None and remaining <= 0:
return ret
# all outstanding name/namespace preorders created at this block
res = namedb_get_preorders_at( db, block_id, offset=rel_offset, count=remaining )
restored_recs, rel_offset, remaining = namedb_get_all_ops_countdown( res, rel_offset, remaining )
ret += restored_recs
if remaining is not None and remaining <= 0:
return ret
# all namespaces preordered at this block
cur = db.cursor()
namespace_preorder_rows_query = "SELECT * FROM namespaces WHERE namespaces.block_number = ?;"
namespace_preorder_rows = namedb_query_execute( cur, namespace_preorder_rows_query, (block_id,))
res = namedb_get_namespaces_preordered_at( db, block_id, include_history=include_history, offset=rel_offset, count=remaining, restore_history=restore_history )
restored_recs, rel_offset, remaining = namedb_get_all_ops_countdown( res, rel_offset, remaining )
restored_recs = namedb_rec_restore( db, namespace_preorder_rows, "namespace_id", block_id, include_history=include_history )
ret += restored_recs
log.debug("%s namespace-preorder states at %s" % (len(restored_recs), block_id ))
if remaining is not None and remaining <= 0:
return ret
# all namespaces revealed/readied at this block
cur = db.cursor()
namespace_rows_query = "SELECT namespaces.* FROM namespaces JOIN history ON namespaces.namespace_id = history.history_id " + \
"WHERE namespaces.block_number <= ? AND history.block_id = ? AND (namespaces.op = ? OR namespaces.op = ?);"
namespace_rows = namedb_query_execute( cur, namespace_rows_query, (block_id,block_id,NAMESPACE_REVEAL,NAMESPACE_READY))
res = namedb_get_namespaces_modified_at( db, block_id, include_history=include_history, offset=rel_offset, count=remaining, restore_history=restore_history )
restored_recs, rel_offset, remaining = namedb_get_all_ops_countdown( res, rel_offset, remaining )
restored_recs = namedb_rec_restore( db, namespace_rows, "namespace_id", block_id, include_history=include_history )
ret += restored_recs
log.debug("%s namespace-change states at %s" % (len(restored_recs), block_id ))
if remaining is not None and remaining <= 0:
return ret
# got everything. put into block order.
return sorted( ret, key=lambda n: n['vtxindex'] )
def namedb_get_num_ops_at( db, block_id ):
"""
Get the number of operations that occurred at a particular block.
Optionally select only the ones listed in op_filter
"""
# get just the counts
count = 0
res = namedb_get_names_preordered_or_imported_at( db, block_id, offset=1e9, count=1e9, restore_history=False )
count += res
res = namedb_get_names_modified_at( db, block_id, offset=1e9, count=1e9, restore_history=False )
count += res
res = namedb_get_preorders_at( db, block_id, offset=1e9, count=1e9 )
count += res
res = namedb_get_namespaces_preordered_at( db, block_id, offset=1e9, count=1e9, restore_history=False )
count += res
res = namedb_get_namespaces_modified_at( db, block_id, offset=1e9, count=1e9, restore_history=False )
count += res
return count
def namedb_get_last_nameops( db, offset=None, count=None ):
"""
Get the last $count records committed, starting at $offset
@@ -1812,7 +2202,6 @@ def namedb_get_last_nameops( db, offset=None, count=None ):
if r[col] not in block_map.keys():
block_map[r[col]] = 1
print json.dumps(block_map, indent=4, sort_keys=True)
# find the blocks to actually load by expanding
hist = []
@@ -1821,7 +2210,6 @@ def namedb_get_last_nameops( db, offset=None, count=None ):
hist.append((k, v))
hist.reverse()
print "reversed hist: %s" % hist
# get the blocks' operations that correspond to offset and offset+count
blocks_to_fetch = []
@@ -1829,11 +2217,11 @@ def namedb_get_last_nameops( db, offset=None, count=None ):
if not height in blocks_to_fetch:
blocks_to_fetch.append(height)
print "to fetch: %s" % blocks_to_fetch
# TODO: pagenate
ret = []
for h in blocks_to_fetch:
ops = namedb_get_all_records_at( db, h )
ops = namedb_get_all_ops_at( db, h )
ops.reverse()
ret += ops
@@ -1847,7 +2235,6 @@ def namedb_get_last_nameops( db, offset=None, count=None ):
left_drop += 1
i -= 1
print "fetched %s; drop %s left" % (len(ret), left_drop)
ret = ret[left_drop:left_drop+count]
return ret
@@ -1860,16 +2247,12 @@ def namedb_get_all_names( cur, current_block, offset=None, count=None ):
unexpired_query, unexpired_args = namedb_select_where_unexpired_names( current_block )
query = "SELECT name FROM name_records JOIN namespaces ON name_records.namespace_id = namespaces.namespace_id WHERE " + unexpired_query + ";"
query = "SELECT name FROM name_records JOIN namespaces ON name_records.namespace_id = namespaces.namespace_id WHERE " + unexpired_query
args = unexpired_args
if count is not None:
query += " LIMIT ?"
args.append( count )
if offset is not None:
query += " OFFSET ?"
args.appnd( offset )
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
query += offset_count_query + ";"
args += offset_count_args
name_rows = namedb_query_execute( cur, query, tuple(args) )
ret = []
@@ -1889,16 +2272,12 @@ def namedb_get_names_in_namespace( cur, namespace_id, current_block, offset=None
unexpired_query, unexpired_args = namedb_select_where_unexpired_names( current_block )
query = "SELECT name FROM name_records JOIN namespaces ON name_records.namespace_id = namespaces.namespace_id WHERE name_records.namespace_id = ? AND " + unexpired_query + " ORDER BY name;"
query = "SELECT name FROM name_records JOIN namespaces ON name_records.namespace_id = namespaces.namespace_id WHERE name_records.namespace_id = ? AND " + unexpired_query + " ORDER BY name"
args = (namespace_id,) + unexpired_args
if count is not None:
query += " LIMIT ?"
args.append( count )
if offset is not None:
query += " OFFSET ?"
args.append( offset )
offset_count_query, offset_count_args = namedb_offset_count_predicate( offset=offset, count=count )
query += offset_count_query + ";"
args += offset_count_args
name_rows = namedb_query_execute( cur, query, tuple(args) )
ret = []
@@ -2193,6 +2572,33 @@ def namedb_get_num_block_vtxs( cur, block_number ):
return count
def namedb_set_block_ops_hash( cur, block_number, ops_hash ):
"""
Set the operations hash for a block height
"""
insert_query = "INSERT INTO ops_hashes (block_id,ops_hash) VALUES (?,?);"
insert_args = (block_number, ops_hash)
namedb_query_execute( cur, insert_query, insert_args )
def namedb_get_block_ops_hash( cur, block_number ):
"""
Get the previously-stored ops hash for this block number.
Return None if not set.
"""
select_query = "SELECT ops_hash FROM ops_hashes WHERE block_id = ?;"
select_args = (block_number,)
rows = namedb_query_execute( cur, select_query, select_args )
ops_hash = None
for r in rows:
ops_hash = r['ops_hash']
break
return ops_hash
if __name__ == "__main__":
# basic unit tests
import random