rework a few things about inodes and inode data in a data store:

* make sure we check all inode headers before getting an inode, so we know the latest version
* have get_mutable, put_mutable, and delete_mutable update data consistency information across all write-devices
* have get_mutable remember which driver(s) succeeded, so subsequent calls on e.g. inode resolution don't needlessly query stale data
* remember data consistency information by device ID, as well as data ID.
This commit is contained in:
Jude Nelson
2017-02-06 15:11:30 -05:00
parent bad676e0e2
commit 7b17a5cdcb

View File

@@ -105,7 +105,7 @@ def serialize_mutable_data_id(data_id):
return urllib.quote(urllib.unquote(data_id).replace('\0', '\\0')).replace('/', r'\x2f')
def load_mutable_data_version(conf, fq_data_id, config_path=CONFIG_PATH):
def load_mutable_data_version(conf, device_id, data_id, config_path=CONFIG_PATH):
"""
Get the version field of a piece of mutable data from local cache.
"""
@@ -122,28 +122,29 @@ def load_mutable_data_version(conf, fq_data_id, config_path=CONFIG_PATH):
if metadata_dir is None or not os.path.isdir(metadata_dir):
return None
# find the version file for this data
serialized_data_id = serialize_mutable_data_id(fq_data_id)
version_file_path = os.path.join(metadata_dir, '{}.ver'.format(serialized_data_id))
dev_id = serialize_mutable_data_id(device_id)
d_id = serialize_mutable_data_id(data_id)
if not os.path.exists(version_file_path):
log.debug('No version path found at {}'.format(version_file_path))
ver_dir = os.path.join(metadata_dir, d_id)
if not os.path.exists(ver_dir):
log.debug("No version path found for {}:{}".format(device_id, data_id))
return None
ver_path = os.path.join(ver_dir, '{}.ver'.format(dev_id))
try:
with open(version_file_path, 'r') as f:
with open(ver_path, 'r') as f:
ver_txt = f.read()
# success!
return int(ver_txt.strip())
except ValueError as ve:
log.warn('Not an integer: "{}"'.format(version_file_path))
log.warn("Not an integer: {}".format(ver_path))
except Exception as e:
log.warn('Failed to read "{}"'.format(version_file_path))
log.warn("Failed to read; {}".format(ver_path))
return None
def store_mutable_data_version(conf, fq_data_id, ver, config_path=CONFIG_PATH):
def store_mutable_data_version(conf, device_id, data_id, ver, config_path=CONFIG_PATH):
"""
Locally store the version of a piece of mutable data,
so we can ensure that its version is incremented on
@@ -176,24 +177,34 @@ def store_mutable_data_version(conf, fq_data_id, ver, config_path=CONFIG_PATH):
log.warning(msg.format(fq_data_id))
return False
serialized_data_id = serialize_mutable_data_id(fq_data_id)
version_file_path = os.path.join(
metadata_dir, '{}.ver'.format(serialized_data_id)
)
d_id = serialize_mutable_data_id(data_id)
dev_id = serialize_mutable_data_id(device_id)
ver_dir = os.path.join(metadata_dir, d_id)
if not os.path.isdir(ver_dir):
try:
log.debug("Make metadata directory {}".format(ver_dir))
os.makedirs(ver_dir)
except Exception, e:
if BLOCKSTACK_DEBUG:
log.exception(e)
log.warning("No metadata directory created for {}:{}".format(device_id, data_id))
return False
ver_path = os.path.join(ver_dir, '{}.ver'.format(dev_id))
try:
with open(version_file_path, 'w') as f:
with open(ver_path, 'w') as f:
f.write(str(ver))
f.flush()
os.fsync(f.fileno())
return True
except Exception as e:
# failed for whatever reason
log.exception(e)
msg = 'Failed to store version of "{}" to "{}"'
log.warn(msg.format(fq_data_id, version_file_path))
if BLOCKSTACK_DEBUG:
log.exception(e)
log.warn("Failed to store version of {}:{}".format(device_id, data_id))
return False
@@ -350,6 +361,7 @@ def list_update_history(name, current_block=None, config_path=CONFIG_PATH, proxy
all_update_hashes = []
block_ids = name_history.keys()
block_ids.sort()
for block_id in block_ids:
history_items = name_history[block_id]
for history_item in history_items:
@@ -357,6 +369,9 @@ def list_update_history(name, current_block=None, config_path=CONFIG_PATH, proxy
if value_hash is None:
continue
if len(all_update_hashes) > 0 and all_update_hashes[-1] == value_hash:
continue
# changed
all_update_hashes.append(value_hash)
@@ -490,7 +505,9 @@ def load_user_data_pubkey_addr( name, storage_drivers=None, proxy=None, config_p
return {'pubkey': data_pubkey, 'address': data_address}
def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None, storage_drivers=None, proxy=None, ver_min=None, ver_max=None, urls=None, device_ids=None, fully_qualified_data_id=False, config_path=CONFIG_PATH):
def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None, storage_drivers=None,
proxy=None, ver_min=None, ver_max=None, urls=None, device_ids=None, fully_qualified_data_id=False,
config_path=CONFIG_PATH, all_drivers=False):
"""
get_mutable
@@ -501,7 +518,7 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
If data_pubkey or data_address is given, then blockchain_id can be arbitrary (it will be passed as a hint to the drivers)
Return {'data': the data, 'version': the version, 'timestamp': ..., 'data_pubkey': ..., 'owner_pubkey_hash': ..., 'driver': driver name} on success
Return {'data': the data, 'version': the version, 'timestamp': ..., 'data_pubkey': ..., 'owner_pubkey_hash': ..., 'drivers': [driver name]} on success
Return {'error': ...} on error
"""
@@ -509,12 +526,11 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
conf = get_config(path=config_path)
fq_data_ids = []
if device_ids is None:
device_ids = get_all_device_ids(config_path=config_path)
if not fully_qualified_data_id:
# v2 mutable data
if device_ids is None:
device_ids = get_all_device_ids(config_path=config_path)
for device_id in device_ids:
fq_data_ids.append( storage.make_fq_data_id(device_id, data_id) )
@@ -544,15 +560,17 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
if storage_drivers is None:
storage_drivers = get_read_storage_drivers(config_path)
expected_version = None
for fq_data_id in fq_data_ids:
version = load_mutable_data_version(conf, fq_data_id, config_path=config_path)
expected_version = expected_version if version is None else max(version, expected_version)
version_info = _get_mutable_data_versions(data_id, device_ids, config_path=config_path)
if 'error' in version_info:
return {'error': 'Failed to load latest data version'}
log.debug("get_mutable({}, blockchain_id={}, pubkey={} ({}), addr={}, expected_version={}, storage_drivers={})".format(fq_data_id, blockchain_id, data_pubkey, lookup, data_address, expected_version, ','.join(storage_drivers)))
expected_version = version_info['version']
log.debug("get_mutable({}, blockchain_id={}, pubkey={} ({}), addr={}, expected_version={}, storage_drivers={})".format(data_id, blockchain_id, data_pubkey, lookup, data_address, expected_version, ','.join(storage_drivers)))
mutable_data = None
mutable_driver = None
mutable_drivers = []
latest_version = expected_version
for fq_data_id in fq_data_ids:
@@ -592,10 +610,21 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
log.warn("Invalid (stale) data version from {} for {}: expected = {}, version = {}".format(driver, fq_data_id, expected_version, version))
continue
# success!
if not all_drivers:
# success!
mutable_data = data
mutable_drivers.append(driver)
break
# keep searching
if version < latest_version:
continue
# got a later version
# discard all prior drivers; they gave stale data
version = latest_version
mutable_data = data
mutable_driver = driver
break
mutable_drivers = [driver]
if mutable_data is not None:
# success!
@@ -605,11 +634,9 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
log.error("Failed to fetch mutable data for {}".format(fq_data_id))
return {'error': 'Failed to fetch mutable data'}
for fq_data_id in fq_data_ids:
# update consistency information
rc = store_mutable_data_version(conf, fq_data_id, version, config_path=config_path)
if not rc:
return {'error': 'Failed to store consistency information'}
rc = _put_mutable_data_versions(data_id, version, device_ids, config_path=config_path)
if 'error' in rc:
return {'error': 'Failed to store consistency information'}
ret = {
'data': mutable_data['data'],
@@ -619,7 +646,7 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
'data_pubkey': data_pubkey,
'owner_pubkey_hash': data_address,
'driver': mutable_driver
'drivers': mutable_drivers
}
return ret
@@ -803,6 +830,7 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
conf = get_config(path=config_path)
fq_data_id = None
device_id = ''
if not fully_qualified_data_id:
# v2 mutable data
@@ -834,7 +862,7 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
# get the version to use
if version is None:
version = load_mutable_data_version(conf, fq_data_id, config_path=config_path)
version = load_mutable_data_version(conf, device_id, data_id, config_path=config_path)
if version is not None and create:
log.error("Already exists: {}".format(fq_data_id))
return {'error': 'Data exists'}
@@ -861,7 +889,7 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
return result
# remember which version this was
rc = store_mutable_data_version(conf, fq_data_id, version, config_path=config_path)
rc = store_mutable_data_version(conf, device_id, data_id, version, config_path=config_path)
if not rc:
result['error'] = 'Failed to store mutable data version'
return result
@@ -1422,6 +1450,7 @@ def put_datastore(user_id, datastore_name, datastore_info, datastore_privkey, pr
root = datastore_info['root']
drivers = datastore['drivers']
device_ids = datastore['device_ids']
all_device_ids = get_all_device_ids(config_path=config_path)
assert datastore_name == datastore['datastore_name']
assert re.match(OP_USER_ID_PATTERN, user_id), user_id
@@ -1446,12 +1475,20 @@ def put_datastore(user_id, datastore_name, datastore_info, datastore_privkey, pr
return {'error': 'Failed to replicate datastore metadata'}
datastore_rec_version = res['version']
# store local record
res = datastore_store(datastore_token, config_path=config_path)
if 'error' in res:
log.error("Failed to store local datastore record")
return {'error': 'Failed to store local datastore record'}
# advance version for all devices
res = _put_mutable_data_versions(data_id, datastore_rec_version, all_device_ids, config_path=config_path)
if 'error' in res:
log.error("Failed to advance consistency data for datastore record")
return res
return {'status': True}
@@ -1562,44 +1599,27 @@ def _get_inode(user_id, inode_uuid, inode_type, data_pubkey_hex, drivers, device
inode_info = None
inode_version = None
drivers_to_try = drivers[:]
# get latest header from all drivers
res = _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers, device_ids, config_path=config_path, proxy=proxy, cache=cache)
if 'error' in res:
log.error("Failed to get inode header for {}: {}".format(inode_uuid, res['error']))
return res
while len(drivers_to_try) > 0:
# get latest header from all drivers we haven't tried yet
res = _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers_to_try, device_ids, config_path=config_path, proxy=proxy, cache=cache)
if 'error' in res:
log.error("Failed to get inode header for {}: {}".format(inode_uuid, res['error']))
return res
header_version = res['version']
inode_header = res['inode']
drivers_to_try = res['drivers']
if header_version <= res['version']:
header_version = res['version']
inode_header = res['inode']
successful_driver = res['driver']
drivers_to_try.remove(successful_driver)
else:
# stale header; try again
log.error("Stale header for {} from {}".format(inode_uuid, successful_driver))
continue
# get the inode from the storage driver that served us the header
data_id = '{}.{}'.format(user_id, inode_uuid)
res = get_mutable(data_id, ver_min=header_version, data_pubkey=data_pubkey_hex, storage_drivers=[successful_driver], proxy=proxy, config_path=config_path)
if 'error' in res:
log.error("Failed to get inode {} from {}: {}".format(inode_uuid, successful_driver, res['error']))
continue
# success!
inode_info = res['data']
inode_version = res['version']
break
if inode_info is None:
# failed to find a fresh inode across all devices
log.error("Failed to find fresh inode {} across all devices".format(inode_uuid))
# get inode from only the driver(s) that gave back fresh information
data_id = '{}.{}'.format(user_id, inode_uuid)
res = get_mutable(data_id, ver_min=header_version, data_pubkey=data_pubkey_hex, storage_drivers=drivers_to_try, proxy=proxy, config_path=config_path)
if 'error' in res:
log.error("Failed to get inode {} from {}: {}".format(inode_uuid, successful_driver, res['error']))
return {'error': 'Failed to find fresh inode'}
# success!
inode_info = res['data']
inode_version = res['version']
# must be an inode
inode_schema = None
if inode_type == MUTABLE_DATUM_DIR_TYPE:
@@ -1649,8 +1669,7 @@ def _get_mutable_data_versions( data_id, device_ids, config_path=CONFIG_PATH ):
assert conf
for device_id in device_ids:
fq_data_id = storage.make_fq_data_id(device_id, data_id)
cur_ver = load_mutable_data_version(conf, fq_data_id, config_path=config_path)
cur_ver = load_mutable_data_version(conf, device_id, data_id, config_path=config_path)
if cur_ver is not None:
new_version = max(new_version, cur_ver)
@@ -1675,8 +1694,7 @@ def _put_mutable_data_versions( data_id, new_version, device_ids, config_path=CO
new_version = max(res['version'], new_version)
for device_id in device_ids:
fq_data_id = storage.make_fq_data_id(device_id, data_id)
rc = store_mutable_data_version(conf, fq_data_id, new_version, config_path=config_path)
rc = store_mutable_data_version(conf, device_id, data_id, new_version, config_path=config_path)
if not rc:
return {'error': 'Failed to advance mutable data version {} to {}'.format(data_id, new_version)}
@@ -1713,11 +1731,12 @@ def _put_inode_consistency_info(user_id, inode_uuid, new_version, device_ids, co
return {'status': True}
def _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers, device_ids, config_path=CONFIG_PATH, proxy=None, cache=None):
def _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers, device_ids, inode_hdr_version=None, config_path=CONFIG_PATH, proxy=None, cache=None):
"""
Get an inode's header data. Verify it matches the inode info.
Fetch the header from *all* drivers
Return {'status': True, 'inode': inode_full_info, 'version': version, 'driver': driver that was used} on success.
Return {'status': True, 'inode': inode_full_info, 'version': version, 'drivers': drivers that were used} on success.
Return {'error': ...} on error.
"""
@@ -1740,11 +1759,12 @@ def _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers, device_ids,
inode_version = res['version']
res = _get_mutable_data_versions( inode_hdr_id, device_ids, config_path=CONFIG_PATH )
if 'error' in res:
return res
if inode_hdr_version is None:
res = _get_mutable_data_versions( inode_hdr_id, device_ids, config_path=CONFIG_PATH )
if 'error' in res:
return res
inode_hdr_version = res['version']
inode_hdr_version = res['version']
'''
if cache is not None:
@@ -1760,22 +1780,23 @@ def _get_inode_header(user_id, inode_uuid, data_pubkey_hex, drivers, device_ids,
return {'status': True, 'inode': inode_hdr}
'''
# get from *all* drivers so we know that if we succeed, we have a fresh version
data_id = '{}.{}.hdr'.format(user_id, inode_uuid)
res = get_mutable(data_id, ver_min=inode_version, data_pubkey=data_pubkey_hex, storage_drivers=drivers, device_ids=device_ids, proxy=proxy, config_path=config_path)
res = get_mutable(data_id, ver_min=max(inode_version, inode_hdr_version), data_pubkey=data_pubkey_hex, storage_drivers=drivers, device_ids=device_ids, proxy=proxy, config_path=config_path, all_drivers=True)
if 'error' in res:
log.error("Failed to get inode data {}: {}".format(inode_uuid, res['error']))
return {'error': 'Failed to get inode data'}
inode_hdr = res['data']
inode_hdr_version = res['version']
inode_driver = res['driver']
inode_drivers = res['drivers']
# advance header version and inode version
res = _put_inode_consistency_info(user_id, inode_uuid, max(inode_hdr_version, inode_version), device_ids, config_path=config_path)
if 'error' in res:
return res
return {'status': True, 'inode': inode_hdr, 'version': max(inode_hdr_version, inode_version), 'driver': inode_driver}
return {'status': True, 'inode': inode_hdr, 'version': max(inode_hdr_version, inode_version), 'drivers': inode_drivers}
def _put_inode(user_id, _inode, data_privkey, drivers, device_ids, config_path=CONFIG_PATH, proxy=None, create=False, cache=None ):
@@ -2784,14 +2805,7 @@ def get_user_list(master_data_pubkey, proxy=None, config_path=CONFIG_PATH):
device_ids = get_all_device_ids(config_path=config_path)
# get expected version across all devices
res = _get_mutable_data_versions(data_id, device_ids, config_path=CONFIG_PATH)
if 'error' in res:
return res
expected_version = res['version']
listing_info = get_mutable(data_id, data_pubkey=master_data_pubkey, proxy=proxy, config_path=config_path, storage_drivers=nonlocal_storage_drivers, ver_min=expected_version)
listing_info = get_mutable(data_id, data_pubkey=master_data_pubkey, proxy=proxy, config_path=config_path, storage_drivers=nonlocal_storage_drivers)
if 'error' in listing_info:
log.error("Failed to get user list")
return listing_info
@@ -2804,11 +2818,6 @@ def get_user_list(master_data_pubkey, proxy=None, config_path=CONFIG_PATH):
except ValidationError:
return {'error': 'Invalid user listing'}
# advance for all devices!
res = _put_mutable_data_versions(data_id, user_list_version, device_ids, config_path=config_path)
if 'error' in res:
return res
return {'status': True, 'user_ids': user_listing}
@@ -2832,15 +2841,8 @@ def put_user_list(master_data_privkey, user_listing, blockchain_id=None, proxy=N
data_id = '{}.{}'.format(addr, 'user_ids')
device_ids = get_all_device_ids(config_path=config_path)
# get expected version across all devices
res = _get_mutable_data_versions(data_id, device_ids, config_path=CONFIG_PATH)
if 'error' in res:
return res
min_version = res['version']
res = put_mutable(data_id, user_listing, blockchain_id=blockchain_id, data_privkey=master_data_privkey, proxy=proxy, config_path=config_path, version=min_version)
res = put_mutable(data_id, user_listing, blockchain_id=blockchain_id, data_privkey=master_data_privkey, proxy=proxy, config_path=config_path)
if 'error' in res:
return res
@@ -3017,16 +3019,9 @@ def next_privkey_index( data_privkey, blockchain_id=None, config_path=CONFIG_PAT
data_id = '{}.{}'.format(addr, 'privkey_index')
device_ids = get_all_device_ids(config_path=config_path)
# get expected version across all devices
res = _get_mutable_data_versions(data_id, device_ids, config_path=CONFIG_PATH)
if 'error' in res:
return res
expected_version = res['version']
nonlocal_storage_drivers = get_nonlocal_storage_drivers(config_path)
privkey_index_info = get_mutable(data_id, blockchain_id=blockchain_id, data_pubkey=data_pubkey, config_path=config_path, storage_drivers=nonlocal_storage_drivers, ver_min=expected_version)
privkey_index_info = get_mutable(data_id, blockchain_id=blockchain_id, data_pubkey=data_pubkey, config_path=config_path, storage_drivers=nonlocal_storage_drivers)
if 'error' in privkey_index_info:
if create:
# try to create
@@ -3098,17 +3093,18 @@ def have_seen( data_id, config_path=CONFIG_PATH ):
assert conf
device_id = get_local_device_id(config_dir=os.path.dirname(config_path))
fq_data_id = storage.make_fq_data_id(device_id, data_id)
expected_version = load_mutable_data_version(conf, fq_data_id, config_path=config_path)
expected_version = load_mutable_data_version(conf, device_id, data_id, config_path=config_path)
return (expected_version is not None)
def data_setup( blockchain_id, password, wallet_keys=None, config_path=CONFIG_PATH, proxy=None):
def data_setup( blockchain_id, password, wallet_keys=None, config_path=CONFIG_PATH, proxy=None, interactive=False):
"""
Do the one-time setup necessary for using the data functions with this name's key bundle.
The wallet must be set up first.
TODO: make interactive
Return {'status': True} on success
Return {'error': ...} on error
"""