mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-19 12:09:48 +08:00
enforce create/exists requirements for inodes; fix bugs found in testing with service-level faults
This commit is contained in:
@@ -546,11 +546,11 @@ def data_blob_serialize( data_blob ):
|
||||
|
||||
def get_mutable(data_id, raw=False, blockchain_id=None, data_pubkey=None, data_address=None, data_hash=None, storage_drivers=None,
|
||||
proxy=None, ver_min=None, ver_max=None, force=False, urls=None, device_ids=None,
|
||||
config_path=CONFIG_PATH, all_drivers=False):
|
||||
config_path=CONFIG_PATH):
|
||||
"""
|
||||
get_mutable
|
||||
|
||||
Fetch a piece of mutable data.
|
||||
Fetch a piece of mutable data from *all* drivers.
|
||||
|
||||
If @ver_min is given, ensure the data's version is greater or equal to it.
|
||||
If @ver_max is given, ensure the data's version is less than it.
|
||||
@@ -613,7 +613,7 @@ def get_mutable(data_id, raw=False, blockchain_id=None, data_pubkey=None, data_a
|
||||
log.debug("get_mutable({}, device_ids={}, blockchain_id={}, pubkey={} ({}), addr={}, hash={}, expected_version={}, storage_drivers={})".format(
|
||||
data_id, device_ids, blockchain_id, data_pubkey, lookup, data_address, data_hash, expected_version, ','.join(storage_drivers)
|
||||
))
|
||||
|
||||
|
||||
mutable_data = None
|
||||
mutable_drivers = []
|
||||
latest_version = expected_version
|
||||
@@ -667,25 +667,25 @@ def get_mutable(data_id, raw=False, blockchain_id=None, data_pubkey=None, data_a
|
||||
log.warn("Invalid (stale) data version from {} for {}: expected = {}, version = {}".format(driver, fq_data_id, expected_version, version))
|
||||
continue
|
||||
|
||||
if not all_drivers:
|
||||
# success!
|
||||
mutable_data = data
|
||||
mutable_drivers.append(driver)
|
||||
break
|
||||
|
||||
# keep searching
|
||||
if version < latest_version:
|
||||
log.warn("{} from {} is stale ({} < {})".format(fq_data_id, driver, version, latest_version))
|
||||
continue
|
||||
|
||||
# got a later version
|
||||
# discard all prior drivers; they gave stale data
|
||||
version = latest_version
|
||||
mutable_data = data
|
||||
mutable_drivers = [driver]
|
||||
elif version == latest_version:
|
||||
log.debug("{} from {} has the same version as latest ({}), available from {}".format(fq_data_id, driver, latest_version, ','.join(mutable_drivers)))
|
||||
mutable_data = data
|
||||
mutable_drivers.append(driver)
|
||||
continue
|
||||
|
||||
if mutable_data is not None:
|
||||
# success!
|
||||
break
|
||||
else:
|
||||
# got a later version
|
||||
# discard all prior drivers; they gave stale data
|
||||
latest_version = version
|
||||
mutable_data = data
|
||||
mutable_drivers = [driver]
|
||||
log.debug("Latest version of {} is now {}, vailable from {}".format(fq_data_id, version, driver))
|
||||
continue
|
||||
|
||||
if mutable_data is None:
|
||||
log.error("Failed to fetch mutable data for {}".format(data_id))
|
||||
@@ -1162,8 +1162,9 @@ def delete_mutable(data_id, signed_data_tombstones, proxy=None, storage_drivers=
|
||||
log.error("Failed to delete {} from storage providers".format(fq_data_id))
|
||||
worst_rc = False
|
||||
continue
|
||||
|
||||
if delete_version:
|
||||
|
||||
if worst_rc and delete_version:
|
||||
# only do this if we actually succeeded in deleting from all storage providers
|
||||
for device_id in device_ids:
|
||||
for fq_data_id in fq_data_ids:
|
||||
delete_mutable_data_version(conf, device_id, fq_data_id, config_path=config_path)
|
||||
@@ -1655,7 +1656,7 @@ def get_inode_data(datastore_id, inode_uuid, inode_type, data_pubkey_hex, driver
|
||||
# only wanted header
|
||||
return {'status': True, 'inode': inode_header, 'version': header_version}
|
||||
|
||||
log.debug("Get inode data for {}.{} from {}".format(datastore_id, inode_uuid, drivers_to_try))
|
||||
log.debug("Get inode data for {}.{} from {}, version {}".format(datastore_id, inode_uuid, drivers_to_try, header_version))
|
||||
|
||||
# get inode from only the driver(s) that gave back fresh information.
|
||||
# expect raw data. It will either be idata (for a file), or a dir listing (for a directory)
|
||||
@@ -1669,7 +1670,7 @@ def get_inode_data(datastore_id, inode_uuid, inode_type, data_pubkey_hex, driver
|
||||
res = None
|
||||
for driver_to_try in drivers_to_try:
|
||||
# try each driver, until we find one with the right hash
|
||||
res = get_mutable(data_id, ver_min=ver_min, device_ids=device_ids, force=force, raw=True, data_hash=data_hash, storage_drivers=drivers_to_try, proxy=proxy, config_path=config_path)
|
||||
res = get_mutable(data_id, ver_min=ver_min, device_ids=device_ids, raw=True, data_hash=data_hash, storage_drivers=drivers_to_try, proxy=proxy, config_path=config_path)
|
||||
if 'error' in res:
|
||||
log.error("Failed to get inode {} from {}: {}".format(inode_uuid, ','.join(drivers_to_try), res['error']))
|
||||
if res.get('stale'):
|
||||
@@ -1833,7 +1834,7 @@ def get_inode_header(datastore_id, inode_uuid, data_pubkey_hex, drivers, device_
|
||||
if force:
|
||||
ver_min = 0
|
||||
|
||||
res = get_mutable(data_id, ver_min=ver_min, force=force, data_pubkey=data_pubkey_hex, storage_drivers=drivers, device_ids=device_ids, proxy=proxy, config_path=config_path, all_drivers=True)
|
||||
res = get_mutable(data_id, ver_min=ver_min, force=force, data_pubkey=data_pubkey_hex, storage_drivers=drivers, device_ids=device_ids, proxy=proxy, config_path=config_path)
|
||||
if 'error' in res:
|
||||
log.error("Failed to get inode data {}: {}".format(inode_uuid, res['error']))
|
||||
errcode = errno.EREMOTEIO
|
||||
@@ -2020,9 +2021,9 @@ def put_inode_data( datastore, header_blob_str, header_blob_sig, idata_str, conf
|
||||
|
||||
# replicate best-effort to each driver
|
||||
# replicate (header, payload) per driver, instead of (headers) to driver and then (payloads) to driver.
|
||||
# this way, if some services are offline but others are not, this write can partially succeed to readers.
|
||||
# this way, if some services are offline but others are not, this write can partially succeed to readers,
|
||||
# and the user can retry the write (from this or another device) to "complete" it.
|
||||
driver_failed = False
|
||||
driver_succeeded = False
|
||||
for driver in drivers:
|
||||
|
||||
# store payload (no signature; we'll use the header's hash)
|
||||
@@ -2037,21 +2038,12 @@ def put_inode_data( datastore, header_blob_str, header_blob_sig, idata_str, conf
|
||||
if 'error' in res:
|
||||
log.error("Failed to replicate inode header for {}: {}".format(header_fqid, res['error']))
|
||||
driver_failed = True
|
||||
continue
|
||||
|
||||
# at least one driver succeeded
|
||||
driver_succeeded = True
|
||||
|
||||
res = {'status': True}
|
||||
|
||||
if driver_succeeded:
|
||||
# update consistency info, even if some driver failed,
|
||||
# so we at least use a later version next time.
|
||||
res = _put_inode_consistency_info(datastore_id, inode_uuid, version, datastore['device_ids'], config_path=config_path)
|
||||
|
||||
if driver_failed:
|
||||
return {'error': 'Failed to replicate inode data to at least one driver', 'errno': errno.EREMOTEIO}
|
||||
|
||||
|
||||
# save consistency info
|
||||
res = _put_inode_consistency_info(datastore_id, inode_uuid, version, datastore['device_ids'], config_path=config_path)
|
||||
return res
|
||||
|
||||
|
||||
@@ -2122,16 +2114,11 @@ def delete_inode_data( datastore, signed_tombstones, proxy=None, config_path=CON
|
||||
inode_tombstones[inode_uuid]['header_tombstones'].append(ts_data['id'])
|
||||
else:
|
||||
inode_tombstones[inode_uuid]['idata_tombstones'].append(ts_data['id'])
|
||||
|
||||
failed_driver = False
|
||||
|
||||
# delete each inode's header and idata
|
||||
# delete inode idata first
|
||||
for inode_uuid in inode_tombstones.keys():
|
||||
hdata_id = '{}.{}.hdr'.format(datastore_id, inode_uuid)
|
||||
res = delete_mutable(hdata_id, signed_data_tombstones=inode_tombstones[inode_uuid]['header_tombstones'],
|
||||
proxy=proxy, storage_drivers=drivers, storage_drivers_exclusive=True, device_ids=device_ids, config_path=config_path)
|
||||
|
||||
if 'error' in res:
|
||||
log.error("Faled to delete idata for {}: {}".format(inode_uuid, res['error']))
|
||||
return res
|
||||
|
||||
# delete inode
|
||||
data_id = '{}.{}'.format(datastore_id, inode_uuid)
|
||||
@@ -2140,6 +2127,19 @@ def delete_inode_data( datastore, signed_tombstones, proxy=None, config_path=CON
|
||||
|
||||
if 'error' in res:
|
||||
log.error("Failed to delete inode {}: {}".format(inode_uuid, res['error']))
|
||||
failed_driver = True
|
||||
|
||||
if failed_driver:
|
||||
return {'error': 'Failed to delete inode data', 'errno': errno.EREMOTEIO}
|
||||
|
||||
# delete inode headers once all idata is gone
|
||||
for inode_uuid in inode_tombstones.keys():
|
||||
hdata_id = '{}.{}.hdr'.format(datastore_id, inode_uuid)
|
||||
res = delete_mutable(hdata_id, signed_data_tombstones=inode_tombstones[inode_uuid]['header_tombstones'],
|
||||
proxy=proxy, storage_drivers=drivers, storage_drivers_exclusive=True, device_ids=device_ids, config_path=config_path)
|
||||
|
||||
if 'error' in res:
|
||||
log.error("Faled to delete idata for {}: {}".format(inode_uuid, res['error']))
|
||||
return res
|
||||
|
||||
return {'status': True}
|
||||
@@ -2430,17 +2430,25 @@ def inode_path_lookup(datastore, data_path, data_pubkey, get_idata=True, force=F
|
||||
return {'status': True, 'path_info': path_info, 'inode_info': inode_info}
|
||||
|
||||
|
||||
def datastore_inodes_check_consistent( datastore_id, inode_headers, device_ids, config_path=CONFIG_PATH ):
|
||||
def datastore_inodes_check_consistent( datastore_id, inode_headers, creates, exists, device_ids, config_path=CONFIG_PATH ):
|
||||
"""
|
||||
Given a list of signed, serialized inode headers, go and verify that they're at least as
|
||||
new as the local versioning information we have.
|
||||
new as the local versioning information we have. Also, if we expect to create an inode,
|
||||
verify that we haven't seen it yet.
|
||||
|
||||
This is the server-side method.
|
||||
|
||||
Return {'status': True} if everything is in order
|
||||
Return {'error': ..., 'errno': ...} otherwise
|
||||
"""
|
||||
for inode_header_str in inode_headers:
|
||||
assert len(creates) == len(inode_headers)
|
||||
assert len(exists) == len(inode_headers)
|
||||
|
||||
for i in xrange(0, len(inode_headers)):
|
||||
inode_header_str = inode_headers[i]
|
||||
create = creates[i]
|
||||
exist = exists[i]
|
||||
|
||||
# parse
|
||||
header_info = analyze_inode_header_blob(datastore_id, inode_header_str)
|
||||
if 'error' in header_info:
|
||||
@@ -2465,6 +2473,16 @@ def datastore_inodes_check_consistent( datastore_id, inode_headers, device_ids,
|
||||
log.error("Stale inode {}".format(header_id))
|
||||
return {'error': 'Stale inode', 'errno': errno.ESTALE}
|
||||
|
||||
if version_info['version'] > 0 and create:
|
||||
# exists but we expected to create
|
||||
log.error("Inode {} exists".format(header_id))
|
||||
return {'error': 'Inode exists', 'errno': errno.EEXIST}
|
||||
|
||||
if version_info['version'] == 0 and exist:
|
||||
# does not exist, but we expected it
|
||||
log.error("Inode {} does not exist".format(header_id))
|
||||
return {'error': 'Inode does not exist', 'errno': errno.ENOENT}
|
||||
|
||||
return {'status': True}
|
||||
|
||||
|
||||
@@ -2507,7 +2525,7 @@ def datastore_inodes_verify( datastore_pubkey, inode_headers, inode_payloads, in
|
||||
return {'status': True}
|
||||
|
||||
|
||||
def datastore_operation_check( datastore_pubkey, inode_headers, inode_payloads, inode_signatures, inode_tombstones, device_ids, config_path=CONFIG_PATH ):
|
||||
def datastore_operation_check( datastore_pubkey, inode_headers, inode_payloads, inode_signatures, inode_tombstones, creates, exists, device_ids, config_path=CONFIG_PATH ):
|
||||
"""
|
||||
Verify that each header and tombstone is signed, that each payload's hash is in the header, and that each
|
||||
inode header is a current or later version
|
||||
@@ -2523,7 +2541,7 @@ def datastore_operation_check( datastore_pubkey, inode_headers, inode_payloads,
|
||||
log.debug("Failed to verify inode and tombstone signatures")
|
||||
return res
|
||||
|
||||
res = datastore_inodes_check_consistent( datastore_id, inode_headers, device_ids, config_path=config_path )
|
||||
res = datastore_inodes_check_consistent( datastore_id, inode_headers, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to verify data is consistent")
|
||||
return res
|
||||
@@ -2551,6 +2569,13 @@ def datastore_do_inode_operation( datastore, inode_headers, inode_payloads, inod
|
||||
assert len(inode_headers) == len(inode_payloads)
|
||||
assert len(inode_payloads) == len(inode_signatures)
|
||||
|
||||
# process tombstones first
|
||||
if len(inode_tombstones) > 0:
|
||||
res = delete_inode_data( datastore, inode_tombstones, proxy=proxy, config_path=config_path)
|
||||
if 'error' in res:
|
||||
log.debug("Failed to delete inode with {}".format(','.join(inode_tombstones)))
|
||||
return res
|
||||
|
||||
# store data
|
||||
for i in xrange(0, len(inode_headers)):
|
||||
header_blob = inode_headers[i]
|
||||
@@ -2562,13 +2587,6 @@ def datastore_do_inode_operation( datastore, inode_headers, inode_payloads, inod
|
||||
log.debug("Failed to put inode {}".format(header_blob))
|
||||
return res
|
||||
|
||||
# process tombstones
|
||||
if len(inode_tombstones) > 0:
|
||||
res = delete_inode_data( datastore, inode_tombstones, proxy=proxy, config_path=config_path)
|
||||
if 'error' in res:
|
||||
log.debug("Failed to delete inode with {}".format(','.join(inode_tombstones)))
|
||||
return res
|
||||
|
||||
return {'status': True}
|
||||
|
||||
|
||||
@@ -2677,10 +2695,20 @@ def datastore_mkdir_put_inodes( datastore, data_path, header_blobs, payloads, si
|
||||
assert len(payloads) == 2
|
||||
assert len(signatures) == 2
|
||||
assert len(tombstones) == 0
|
||||
creates = [True, False] # create child
|
||||
exists = [False, True] # parent must exist
|
||||
|
||||
'''
|
||||
header_blobs = [header_blobs[1], header_blobs[0]]
|
||||
payloads = [payloads[1], payloads[0]]
|
||||
signatures = [signatures[1], signatures[0]]
|
||||
creates = [creates[1], creates[0]]
|
||||
exists = [exists[1], exists[0]]
|
||||
'''
|
||||
|
||||
device_ids = datastore['device_ids']
|
||||
data_pubkey = datastore['pubkey']
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, device_ids, config_path=config_path )
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to check operation: {}".format(res['error']))
|
||||
return res
|
||||
@@ -2737,6 +2765,7 @@ def datastore_rmdir_make_inodes(api_client, datastore, data_path, data_pubkey, p
|
||||
path_info = _parse_data_path( data_path )
|
||||
data_path = path_info['data_path']
|
||||
name = path_info['iname']
|
||||
creates = [False, False]
|
||||
|
||||
if data_path == '/':
|
||||
# can't do this
|
||||
@@ -2824,13 +2853,17 @@ def datastore_rmdir_put_inodes( datastore, data_path, header_blobs, payloads, si
|
||||
assert len(payloads) == 1, payloads
|
||||
assert len(signatures) == 1, signatures
|
||||
assert len(tombstones) >= 1, tombstones
|
||||
creates = [False]
|
||||
exists = [True]
|
||||
|
||||
if proxy is None:
|
||||
proxy = get_default_proxy(config_path=config_path)
|
||||
|
||||
# directory must actually be empty
|
||||
|
||||
device_ids = datastore['device_ids']
|
||||
data_pubkey = datastore['pubkey']
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, device_ids, config_path=config_path )
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to check operation: {}".format(res['error']))
|
||||
return res
|
||||
@@ -3037,7 +3070,7 @@ def datastore_putfile_make_inodes(api_client, datastore, data_path, file_data_ha
|
||||
return ret
|
||||
|
||||
|
||||
def datastore_putfile_put_inodes( datastore, data_path, header_blobs, payloads, signatures, tombstones, config_path=CONFIG_PATH, proxy=None ):
|
||||
def datastore_putfile_put_inodes( datastore, data_path, header_blobs, payloads, signatures, tombstones, create=False, exist=False, config_path=CONFIG_PATH, proxy=None ):
|
||||
"""
|
||||
Given the header blobs and payloads from datastore_putfile_make_inodes() and client-given signatures and the actual file data,
|
||||
go and store them all.
|
||||
@@ -3045,7 +3078,7 @@ def datastore_putfile_put_inodes( datastore, data_path, header_blobs, payloads,
|
||||
Order matters:
|
||||
header_blobs[0], payloads[0], and signatures[0] are for the parent directory
|
||||
header_blobs[1], payloads[1], and signatures[1] are for the child file.
|
||||
payloads[1] should be the client-supplied payload (
|
||||
payloads[1] should be the client-supplied payload
|
||||
tombstones[0] is for the child deleted
|
||||
|
||||
Return {'status': True} on success
|
||||
@@ -3055,13 +3088,15 @@ def datastore_putfile_put_inodes( datastore, data_path, header_blobs, payloads,
|
||||
assert len(payloads) == 2
|
||||
assert len(signatures) == 2
|
||||
assert len(tombstones) == 0
|
||||
creates = [create, False] # parent will not be created
|
||||
exists = [exist, True] # parent must exist
|
||||
|
||||
if proxy is None:
|
||||
proxy = get_default_proxy(config_path=config_path)
|
||||
|
||||
device_ids = datastore['device_ids']
|
||||
data_pubkey = datastore['pubkey']
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, device_ids, config_path=config_path )
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to check operation: {}".format(res['error']))
|
||||
return res
|
||||
@@ -3069,7 +3104,7 @@ def datastore_putfile_put_inodes( datastore, data_path, header_blobs, payloads,
|
||||
return datastore_do_inode_operation( datastore, header_blobs, payloads, signatures, tombstones, config_path=config_path, proxy=proxy )
|
||||
|
||||
|
||||
def datastore_putfile(api_client, datastore, data_path, file_data_bin, data_privkey_hex, create=False, force=False, config_path=CONFIG_PATH):
|
||||
def datastore_putfile(api_client, datastore, data_path, file_data_bin, data_privkey_hex, create=False, exist=False, force=False, config_path=CONFIG_PATH):
|
||||
"""
|
||||
Client-side method to store a file. MEANT FOR TESTING PURPOSES
|
||||
* generate the directory inodes
|
||||
@@ -3100,7 +3135,7 @@ def datastore_putfile(api_client, datastore, data_path, file_data_bin, data_priv
|
||||
inode_info['payloads'][0] = file_data_b64
|
||||
|
||||
datastore_info = datastore_serialize_and_sign(datastore, data_privkey_hex)
|
||||
res = api_client.backend_datastore_putfile( datastore_info['str'], datastore_info['sig'], data_path, inode_info['inodes'], inode_info['payloads'], inode_signatures, inode_info['tombstones'] )
|
||||
res = api_client.backend_datastore_putfile( datastore_info['str'], datastore_info['sig'], data_path, inode_info['inodes'], inode_info['payloads'], inode_signatures, inode_info['tombstones'], create=create, exist=exist )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to put putfile inodes")
|
||||
return res
|
||||
@@ -3196,13 +3231,15 @@ def datastore_deletefile_put_inodes( datastore, data_path, header_blobs, payload
|
||||
assert len(payloads) == 1
|
||||
assert len(signatures) == 1
|
||||
assert len(tombstones) >= 1
|
||||
creates = [False]
|
||||
exists = [True]
|
||||
|
||||
if proxy is None:
|
||||
proxy = get_default_proxy(config_path=config_path)
|
||||
|
||||
device_ids = datastore['device_ids']
|
||||
data_pubkey = datastore['pubkey']
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, device_ids, config_path=config_path )
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to check operation: {}".format(res['error']))
|
||||
return res
|
||||
@@ -3474,20 +3511,23 @@ def datastore_rmtree_put_inodes( datastore, header_blobs, payloads, signatures,
|
||||
Return {'error': ..., 'errno': ...} on failure
|
||||
"""
|
||||
# only putting the now-empty directory
|
||||
assert len(header_blobs) <= 1, header_blobs
|
||||
assert len(payloads) <= 1, payloads
|
||||
assert len(header_blobs) == 1, header_blobs
|
||||
assert len(payloads) == 1, payloads
|
||||
assert len(signatures) <= 1
|
||||
assert len(tombstones) >= 0
|
||||
|
||||
assert len(header_blobs) == len(payloads)
|
||||
assert len(payloads) == len(signatures)
|
||||
|
||||
creates = [False]
|
||||
exists = [True]
|
||||
|
||||
if proxy is None:
|
||||
proxy = get_default_proxy(config_path=config_path)
|
||||
|
||||
device_ids = datastore['device_ids']
|
||||
data_pubkey = datastore['pubkey']
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, device_ids, config_path=config_path )
|
||||
res = datastore_operation_check( data_pubkey, header_blobs, payloads, signatures, tombstones, creates, exists, device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.debug("Failed to check operation: {}".format(res['error']))
|
||||
return res
|
||||
|
||||
Reference in New Issue
Block a user