mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-23 19:31:00 +08:00
query zonefiles by txid to find the name to which they belong, and
forward both the name and txid along to the logic that stores zonefiles (so it can take driver-specific actions regarding e.g. how much was paid, who paid, etc.)
This commit is contained in:
@@ -154,6 +154,7 @@ ATLASDB_SQL = """
|
||||
CREATE TABLE zonefiles( inv_index INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name STRING NOT NULL,
|
||||
zonefile_hash TEXT NOT NULL,
|
||||
txid STRING UNIQUE NOT NULL,
|
||||
present INTEGER NOT NULL,
|
||||
tried_storage INTEGER NOT NULL,
|
||||
block_height INTEGER NOT NULL );
|
||||
@@ -438,7 +439,7 @@ def atlasdb_open( path ):
|
||||
return con
|
||||
|
||||
|
||||
def atlasdb_add_zonefile_info( name, zonefile_hash, present, block_height, con=None, path=None ):
|
||||
def atlasdb_add_zonefile_info( name, zonefile_hash, txid, present, block_height, con=None, path=None ):
|
||||
"""
|
||||
Add a zonefile to the database.
|
||||
Mark it as present or absent.
|
||||
@@ -455,8 +456,8 @@ def atlasdb_add_zonefile_info( name, zonefile_hash, present, block_height, con=N
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
sql = "INSERT INTO zonefiles (name, zonefile_hash, present, tried_storage, block_height) VALUES (?,?,?,?,?);"
|
||||
args = (name, zonefile_hash, present, 0, block_height)
|
||||
sql = "INSERT INTO zonefiles (name, zonefile_hash, txid, present, tried_storage, block_height) VALUES (?,?,?,?,?,?);"
|
||||
args = (name, zonefile_hash, txid, present, 0, block_height)
|
||||
|
||||
cur = con.cursor()
|
||||
atlasdb_query_execute( cur, sql, args )
|
||||
@@ -555,6 +556,40 @@ def atlasdb_get_zonefile( zonefile_hash, con=None, path=None ):
|
||||
return ret
|
||||
|
||||
|
||||
def atlasdb_find_zonefile_by_txid( txid, con=None, path=None ):
|
||||
"""
|
||||
Look up a zonefile by txid
|
||||
Returns {'zonefile_hash': ..., 'name': ..., etc.}
|
||||
Returns None if not found
|
||||
"""
|
||||
if path is None:
|
||||
path = atlasdb_path()
|
||||
|
||||
close = False
|
||||
if con is None:
|
||||
close = True
|
||||
con = atlasdb_open( path )
|
||||
assert con is not None
|
||||
|
||||
sql = "SELECT * FROM zonefiles WHERE txid = ?;"
|
||||
args = (txid,)
|
||||
|
||||
cur = con.cursor()
|
||||
res = atlasdb_query_execute( cur, sql, args )
|
||||
con.commit()
|
||||
|
||||
ret = None
|
||||
for zfinfo in res:
|
||||
ret = {}
|
||||
ret.update(zfinfo)
|
||||
break
|
||||
|
||||
if close:
|
||||
con.close()
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def atlasdb_set_zonefile_present( zonefile_hash, present, con=None, path=None ):
|
||||
"""
|
||||
Mark a zonefile as present (i.e. we stored it).
|
||||
@@ -748,14 +783,16 @@ def atlasdb_queue_zonefiles( con, db, start_block, zonefile_dir=None, validate=T
|
||||
total = 0
|
||||
for block_height in xrange(start_block, db.lastblock+1, 1):
|
||||
|
||||
names_and_zonefile_hashes = db.get_names_and_value_hashes_at( block_height )
|
||||
for name_zfhash in names_and_zonefile_hashes:
|
||||
name = str(name_zfhash['name'])
|
||||
zfhash = str(name_zfhash['value_hash'])
|
||||
zonefile_info = db.get_atlas_zonefile_info_at( block_height )
|
||||
for name_txid_zfhash in zonefile_info:
|
||||
name = str(name_txid_zfhash['name'])
|
||||
zfhash = str(name_txid_zfhash['value_hash'])
|
||||
txid = str(name_txid_zfhash['txid'])
|
||||
|
||||
present = is_zonefile_cached( zfhash, zonefile_dir=zonefile_dir, validate=validate )
|
||||
|
||||
log.debug("Add %s %s at %s (present: %s)" % (name, zfhash, block_height, present) )
|
||||
atlasdb_add_zonefile_info( name, zfhash, present, block_height, con=con )
|
||||
log.debug("Add %s %s %s at %s (present: %s)" % (name, zfhash, txid, block_height, present) )
|
||||
atlasdb_add_zonefile_info( name, zfhash, txid, present, block_height, con=con )
|
||||
total += 1
|
||||
|
||||
log.debug("Queued %s zonefiles from %s-%s" % (total, start_block, db.lastblock))
|
||||
@@ -2237,6 +2274,8 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
|
||||
Return a dict, structured as:
|
||||
{
|
||||
'zonefile hash': {
|
||||
'names': [name],
|
||||
'txid': txid,
|
||||
'indexes': [...],
|
||||
'popularity': ...,
|
||||
'peers': [...],
|
||||
@@ -2283,6 +2322,8 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
|
||||
|
||||
if not ret.has_key(zfinfo['zonefile_hash']):
|
||||
ret[zfinfo['zonefile_hash']] = {
|
||||
'names': [],
|
||||
'txid': zfinfo['txid'],
|
||||
'indexes': [],
|
||||
'popularity': 0,
|
||||
'peers': [],
|
||||
@@ -2303,6 +2344,7 @@ def atlas_find_missing_zonefile_availability( peer_table=None, con=None, path=No
|
||||
popularity += 1
|
||||
peers.append( peer_hostport )
|
||||
|
||||
ret[zfinfo['zonefile_hash']]['names'].append( zfinfo['name'] )
|
||||
ret[zfinfo['zonefile_hash']]['indexes'].append( zfinfo['inv_index']-1 )
|
||||
ret[zfinfo['zonefile_hash']]['popularity'] += popularity
|
||||
ret[zfinfo['zonefile_hash']]['peers'] += peers
|
||||
@@ -2510,12 +2552,12 @@ def atlas_get_zonefiles( my_hostport, peer_hostport, zonefile_hashes, timeout=No
|
||||
return zonefile_datas
|
||||
|
||||
|
||||
def atlas_get_zonefile_data_from_storage( zonefile_hash, storage_drivers ):
|
||||
def atlas_get_zonefile_data_from_storage( name, zonefile_hash, storage_drivers ):
|
||||
"""
|
||||
Go get a zonefile from storage drivers
|
||||
"""
|
||||
try:
|
||||
res = get_zonefile_data_from_storage( zonefile_hash, drivers=storage_drivers )
|
||||
res = get_zonefile_data_from_storage( name, zonefile_hash, drivers=storage_drivers )
|
||||
return {'status': True, 'zonefile_data': res}
|
||||
except Exception, e:
|
||||
if os.environ.get("BLOCKSTACK_TEST", None) == "1":
|
||||
@@ -3385,14 +3427,14 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
self.path = atlasdb_path()
|
||||
|
||||
|
||||
def store_zonefile_data( self, fetched_zfhash, zonefile_data, peer_hostport, con, path ):
|
||||
def store_zonefile_data( self, name, fetched_zfhash, txid, zonefile_data, peer_hostport, con, path ):
|
||||
"""
|
||||
Store the fetched zonefile (as a serialized string) to storage and cache it locally.
|
||||
Update internal state to mark it present
|
||||
Return True on success
|
||||
Return False on error
|
||||
"""
|
||||
rc = store_zonefile_data_to_storage( zonefile_data, required=self.zonefile_storage_drivers, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
|
||||
rc = store_zonefile_data_to_storage( name, zonefile_data, txid, required=self.zonefile_storage_drivers, cache=True, zonefile_dir=self.zonefile_dir, tx_required=False )
|
||||
if not rc:
|
||||
log.error("%s: Failed to store zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
|
||||
@@ -3406,7 +3448,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
return rc
|
||||
|
||||
|
||||
def store_zonefiles( self, zonefiles, peer_zonefile_hashes, peer_hostport, path, con=None ):
|
||||
def store_zonefiles( self, zonefile_names, zonefiles, zonefile_txids, peer_zonefile_hashes, peer_hostport, path, con=None ):
|
||||
"""
|
||||
Store a list of RPC-fetched zonefiles (but only ones in peer_zonefile_hashes) from the given peer_hostport
|
||||
Return the list of zonefile hashes stored.
|
||||
@@ -3426,7 +3468,28 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
log.warn("%s: Unsolicited zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
continue
|
||||
|
||||
rc = self.store_zonefile_data( fetched_zfhash, zonefile_txt, peer_hostport, con, path )
|
||||
zfnames = zonefile_names.get(fetched_zfhash, None)
|
||||
if zfnames is None:
|
||||
# unsolicited
|
||||
log.warn("%s: Unknown zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
continue
|
||||
|
||||
zftxid = zonefile_txids.get(fetched_zfhash, None)
|
||||
if zftxid is None:
|
||||
# not paid for
|
||||
log.warn("%s: Unpaid zonefile %s" % (self.hostport, fetched_zfhash))
|
||||
continue
|
||||
|
||||
# pick a name
|
||||
zfinfo = atlasdb_find_zonefile_by_txid( zftxid, path=path, con=con )
|
||||
if zfinfo is None:
|
||||
# don't know about this txid
|
||||
log.warn("%s: Unknown txid %s for %s" % (self.hostport, zftxid, fetched_zfhash))
|
||||
continue
|
||||
|
||||
zfname = zfinfo['name']
|
||||
|
||||
rc = self.store_zonefile_data( zfname, fetched_zfhash, zftxid, zonefile_txt, peer_hostport, con, path )
|
||||
if rc:
|
||||
# don't ask for it again
|
||||
ret.append( fetched_zfhash )
|
||||
@@ -3438,7 +3501,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
return ret
|
||||
|
||||
|
||||
def try_crawl_storage( self, zfhash, path, con=None ):
|
||||
def try_crawl_storage( self, name, zfhash, txid, path, con=None ):
|
||||
"""
|
||||
Try to get a zonefile from storage
|
||||
Record in the DB that we tried.
|
||||
@@ -3455,7 +3518,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
# is this zonefile available via storage?
|
||||
log.debug("Try loading %s from storage" % zfhash)
|
||||
|
||||
zonefile_info = atlas_get_zonefile_data_from_storage( zfhash, self.zonefile_storage_drivers )
|
||||
zonefile_info = atlas_get_zonefile_data_from_storage( name, zfhash, self.zonefile_storage_drivers )
|
||||
|
||||
# tried loading from storage
|
||||
atlasdb_set_zonefile_tried_storage( zfhash, True, con=con, path=path )
|
||||
@@ -3467,7 +3530,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
else:
|
||||
# got it! remember it
|
||||
log.debug("%s: got %s from storage" % (self.hostport, zfhash))
|
||||
rc = self.store_zonefile_data( zfhash, zonefile_info['zonefile_data'], "storage", con, path )
|
||||
rc = self.store_zonefile_data( name, zfhash, txid, zonefile_info['zonefile_data'], "storage", con, path )
|
||||
|
||||
if close:
|
||||
con.close()
|
||||
@@ -3532,6 +3595,8 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
zonefile_ranking = [ (missing_zfinfo[zfhash]['popularity'], zfhash) for zfhash in missing_zfinfo.keys() ]
|
||||
zonefile_ranking.sort()
|
||||
zonefile_hashes = list(set([zfhash for (_, zfhash) in zonefile_ranking]))
|
||||
zonefile_names = dict([(zfhash, missing_zfinfo[zfhash]['names']) for zfhash in zonefile_hashes])
|
||||
zonefile_txids = dict([(zfhash, missing_zfinfo[zfhash]['txid']) for zfhash in zonefile_hashes])
|
||||
zonefile_origins = self.find_zonefile_origins( missing_zfinfo, peer_hostports )
|
||||
|
||||
# filter out the ones that are already cached
|
||||
@@ -3550,14 +3615,23 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
while len(zonefile_hashes) > 0:
|
||||
|
||||
zfhash = zonefile_hashes[0]
|
||||
zfnames = zonefile_names[zfhash]
|
||||
zftxid = zonefile_txids[zfhash]
|
||||
peers = missing_zfinfo[zfhash]['peers']
|
||||
|
||||
zfinfo = atlasdb_find_zonefile_by_txid( zftxid, path=path )
|
||||
if zfinfo is None:
|
||||
# not known to us
|
||||
log.warn("%s: unknown zonefile %s" % (self.hostport, zfhash))
|
||||
|
||||
zfname = zfinfo['name']
|
||||
|
||||
# is this zonefile available via storage?
|
||||
if not missing_zfinfo[zfhash]['tried_storage']:
|
||||
|
||||
# this can be somewhat memory-intensive, so
|
||||
# invoke the gc immediately afterwards
|
||||
rc = self.try_crawl_storage( zfhash, path )
|
||||
rc = self.try_crawl_storage( zfname, zfhash, zftxid, path )
|
||||
gc.collect(2)
|
||||
|
||||
if rc:
|
||||
@@ -3598,7 +3672,7 @@ class AtlasZonefileCrawler( threading.Thread ):
|
||||
if zonefiles is not None:
|
||||
|
||||
# got zonefiles!
|
||||
stored_zfhashes = self.store_zonefiles( zonefiles, peer_zonefile_hashes, peer_hostport, path )
|
||||
stored_zfhashes = self.store_zonefiles( zonefile_names, zonefiles, zonefile_txids, peer_zonefile_hashes, peer_hostport, path )
|
||||
|
||||
# don't ask again
|
||||
log.debug("Stored %s zonefiles" % len(stored_zfhashes))
|
||||
|
||||
Reference in New Issue
Block a user