mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-22 10:36:57 +08:00
use exclusively the required storage drivers for data stores
This commit is contained in:
@@ -128,7 +128,7 @@ def load_mutable_data_version(conf, device_id, data_id, config_path=CONFIG_PATH)
|
||||
|
||||
if conf is None:
|
||||
msg = 'No config found; cannot load version for "{}"'
|
||||
log.debug(msg.format(fq_data_id))
|
||||
log.debug(msg.format(data_id))
|
||||
return None
|
||||
|
||||
metadata_dir = get_metadata_dir(conf)
|
||||
@@ -168,7 +168,7 @@ def store_mutable_data_version(conf, device_id, data_id, ver, config_path=CONFIG
|
||||
|
||||
if conf is None:
|
||||
msg = 'No config found; cannot store version for "{}"'
|
||||
log.warning(msg.format(fq_data_id))
|
||||
log.warning(msg.format(data_id))
|
||||
return False
|
||||
|
||||
metadata_dir = get_metadata_dir(conf)
|
||||
@@ -182,7 +182,7 @@ def store_mutable_data_version(conf, device_id, data_id, ver, config_path=CONFIG
|
||||
log.exception(e)
|
||||
|
||||
msg = 'No metadata directory created; cannot store version of "{}"'
|
||||
log.warning(msg.format(fq_data_id))
|
||||
log.warning(msg.format(data_id))
|
||||
return False
|
||||
|
||||
d_id = serialize_mutable_data_id(data_id)
|
||||
@@ -216,7 +216,7 @@ def store_mutable_data_version(conf, device_id, data_id, ver, config_path=CONFIG
|
||||
return False
|
||||
|
||||
|
||||
def delete_mutable_data_version(conf, data_id):
|
||||
def delete_mutable_data_version(conf, device_id, data_id, config_path=CONFIG_PATH):
|
||||
"""
|
||||
Locally delete the version of a piece of mutable data.
|
||||
|
||||
@@ -224,31 +224,33 @@ def delete_mutable_data_version(conf, data_id):
|
||||
Return False if not
|
||||
"""
|
||||
|
||||
conf = get_config() if conf is None else conf
|
||||
conf = get_config(path=config_path) if conf is None else conf
|
||||
|
||||
if conf is None:
|
||||
msg = 'No config found; cannot store version for "{}"'
|
||||
log.warning(msg.format(data_id))
|
||||
msg = 'No config found; cannot delete version for "{}"'
|
||||
return False
|
||||
|
||||
metadata_dir = get_metadata_dir(conf)
|
||||
|
||||
if not os.path.isdir(metadata_dir):
|
||||
msg = 'No metadata directory found; cannot store version of "{}"'
|
||||
log.warning(msg.format(data_id))
|
||||
return False
|
||||
|
||||
serialized_data_id = data_id.replace('/', '\x2f').replace('\0', '\\0')
|
||||
version_file_path = os.path.join(
|
||||
metadata_dir, '{}.ver'.format(serialized_data_id)
|
||||
)
|
||||
|
||||
try:
|
||||
os.unlink(version_file_path)
|
||||
return True
|
||||
|
||||
d_id = serialize_mutable_data_id(data_id)
|
||||
dev_id = serialize_mutable_data_id(device_id)
|
||||
|
||||
ver_dir = os.path.join(metadata_dir, d_id)
|
||||
if not os.path.isdir(ver_dir):
|
||||
return True
|
||||
|
||||
ver_path = os.path.join(ver_dir, '{}.ver'.format(dev_id))
|
||||
try:
|
||||
if os.path.exists(ver_path):
|
||||
os.unlink(ver_path)
|
||||
|
||||
except Exception as e:
|
||||
# failed for whatever reason
|
||||
msg = 'Failed to remove version file "{}"'
|
||||
log.warn(msg.format(version_file_path))
|
||||
log.warn(msg.format(ver_file_path))
|
||||
|
||||
return False
|
||||
|
||||
@@ -964,7 +966,7 @@ def load_user_data_privkey( blockchain_id, storage_drivers=None, proxy=None, con
|
||||
|
||||
|
||||
def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, proxy=None, fully_qualified_data_id=False,
|
||||
storage_drivers=None, zonefile_storage_drivers=None, version=None, wallet_keys=None,
|
||||
storage_drivers=None, storage_drivers_exclusive=False, zonefile_storage_drivers=None, version=None, wallet_keys=None,
|
||||
config_path=CONFIG_PATH, create=False):
|
||||
"""
|
||||
put_mutable.
|
||||
@@ -1055,8 +1057,8 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
|
||||
data = data_blob_serialize(data_json)
|
||||
result = {}
|
||||
|
||||
log.debug("put_mutable({}, blockchain_id={}, lookup_privkey={}, version={}, storage_drivers={})".format(fq_data_id, blockchain_id, lookup, version, ','.join(storage_drivers)))
|
||||
rc = storage.put_mutable_data(fq_data_id, data, data_privkey, blockchain_id=blockchain_id, required=storage_drivers)
|
||||
log.debug("put_mutable({}, blockchain_id={}, lookup_privkey={}, version={}, storage_drivers={}, exclusive={})".format(fq_data_id, blockchain_id, lookup, version, ','.join(storage_drivers), storage_drivers_exclusive))
|
||||
rc = storage.put_mutable_data(fq_data_id, data, data_privkey, blockchain_id=blockchain_id, required=storage_drivers, required_exclusive=storage_drivers_exclusive)
|
||||
if not rc:
|
||||
log.error("failed to put mutable data {}".format(fq_data_id))
|
||||
result['error'] = 'Failed to store mutable data'
|
||||
@@ -1186,7 +1188,9 @@ def delete_immutable(blockchain_id, data_key, data_id=None, proxy=None, txid=Non
|
||||
return result
|
||||
|
||||
|
||||
def delete_mutable(data_id, data_privkey=None, proxy=None, storage_drivers=None, device_ids=None, wallet_keys=None, delete_version=True, fully_qualified_data_id=False, blockchain_id=None, config_path=CONFIG_PATH):
|
||||
def delete_mutable(data_id, data_privkey=None, proxy=None, storage_drivers=None, storage_drivers_exclusive=False,
|
||||
device_ids=None, wallet_keys=None, delete_version=True, fully_qualified_data_id=False,
|
||||
blockchain_id=None, config_path=CONFIG_PATH):
|
||||
"""
|
||||
delete_mutable
|
||||
|
||||
@@ -1236,14 +1240,15 @@ def delete_mutable(data_id, data_privkey=None, proxy=None, storage_drivers=None,
|
||||
|
||||
# remove the data itself
|
||||
for fq_data_id in fq_data_ids:
|
||||
rc = storage.delete_mutable_data(fq_data_id, data_privkey, blockchain_id=blockchain_id)
|
||||
rc = storage.delete_mutable_data(fq_data_id, data_privkey, required=storage_drivers, required_exclusive=storage_drivers_exclusive, blockchain_id=blockchain_id)
|
||||
if not rc:
|
||||
log.error("Failed to delete {} from storage providers".format(fq_data_id))
|
||||
worst_rc = False
|
||||
continue
|
||||
|
||||
if delete_version:
|
||||
delete_mutable_data_version(conf, fq_data_id)
|
||||
if delete_version:
|
||||
for device_id in device_ids:
|
||||
delete_mutable_data_version(conf, device_id, data_id, config_path=config_path)
|
||||
|
||||
if not worst_rc:
|
||||
return {'error': 'Failed to delete from all storage providers'}
|
||||
@@ -1485,7 +1490,7 @@ def put_datastore(datastore_info, datastore_privkey, proxy=None, config_path=CON
|
||||
return {'error': 'Failed to replicate datastore metadata', 'errno': errno.EREMOTEIO}
|
||||
|
||||
# replicate public datastore record
|
||||
res = put_mutable( data_id, datastore, data_privkey=datastore_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy )
|
||||
res = put_mutable( data_id, datastore, data_privkey=datastore_privkey, storage_drivers=drivers, storage_drivers_exclusive=True, config_path=config_path, proxy=proxy )
|
||||
if 'error' in res:
|
||||
log.error("Failed to put datastore metadata for {}".format(datastore_fq_id))
|
||||
|
||||
@@ -1556,7 +1561,7 @@ def delete_datastore( datastore_privkey, force=False, config_path=CONFIG_PATH, p
|
||||
|
||||
# remove public datastore record
|
||||
data_id = '{}.datastore'.format(datastore_id)
|
||||
res = delete_mutable(data_id, data_privkey=datastore_privkey, proxy=proxy, config_path=config_path )
|
||||
res = delete_mutable(data_id, data_privkey=datastore_privkey, storage_drivers=datastore['drivers'], storage_drivers_exclusive=True, proxy=proxy, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.error("Failed to delete public datastore record: {}".format(res['error']))
|
||||
return {'error': 'Failed to delete public datastore record', 'errno': errno.EREMOTEIO}
|
||||
@@ -1852,7 +1857,7 @@ def _put_inode(datastore_id, _inode, data_privkey, drivers, device_ids, config_p
|
||||
data_id = '{}.{}'.format(datastore_id, _inode['uuid'])
|
||||
inode_data = data_blob_serialize(_inode)
|
||||
|
||||
res = put_mutable(data_id, inode_data, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
|
||||
res = put_mutable(data_id, inode_data, data_privkey=data_privkey, storage_drivers=drivers, storage_drivers_exclusive=True, config_path=config_path, proxy=proxy, create=create )
|
||||
if 'error' in res:
|
||||
log.error("Failed to replicate inode {}: {}".format(_inode['uuid'], res['error']))
|
||||
return {'error': 'Failed to replicate inode'}
|
||||
@@ -1877,7 +1882,7 @@ def _put_inode(datastore_id, _inode, data_privkey, drivers, device_ids, config_p
|
||||
|
||||
data_hdr_id = '{}.{}.hdr'.format(datastore_id, inode_hdr['uuid'])
|
||||
inode_hdr_data = data_blob_serialize(inode_hdr)
|
||||
res = put_mutable(data_hdr_id, inode_hdr_data, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
|
||||
res = put_mutable(data_hdr_id, inode_hdr_data, data_privkey=data_privkey, storage_drivers=drivers, storage_drivers_exclusive=True, config_path=config_path, proxy=proxy, create=create )
|
||||
if 'error' in res:
|
||||
log.error("Failed to replicate inode header for {}: {}".format(inode['uuid'], res['error']))
|
||||
return {'error': 'Failed to replicate inode header'}
|
||||
@@ -1911,14 +1916,14 @@ def _delete_inode(datastore_id, inode_uuid, data_privkey, drivers, device_ids, c
|
||||
# delete inode header
|
||||
idata_id = '{}.hdr'.format(inode_uuid)
|
||||
hdata_id = '{}.{}'.format(datastore_id, idata_id)
|
||||
res = delete_mutable(hdata_id, data_privkey=data_privkey, proxy=proxy, storage_drivers=drivers, device_ids=device_ids, config_path=config_path)
|
||||
res = delete_mutable(hdata_id, data_privkey=data_privkey, proxy=proxy, storage_drivers=drivers, storage_drivers_exclusive=True, device_ids=device_ids, config_path=config_path)
|
||||
if 'error' in res:
|
||||
log.error("Faled to delete idata for {}: {}".format(inode_uuid, res['error']))
|
||||
return res
|
||||
|
||||
# delete inode
|
||||
data_id = '{}.{}'.format(datastore_id, inode_uuid)
|
||||
res = delete_mutable(data_id, data_privkey=data_privkey, proxy=proxy, storage_drivers=drivers, device_ids=device_ids, config_path=config_path )
|
||||
res = delete_mutable(data_id, data_privkey=data_privkey, proxy=proxy, storage_drivers=drivers, storage_drivers_exclusive=True, device_ids=device_ids, config_path=config_path )
|
||||
if 'error' in res:
|
||||
log.error("Failed to delete inode {}: {}".format(inode_uuid, res['error']))
|
||||
return res
|
||||
|
||||
Reference in New Issue
Block a user