add "typed netstrings" to use to serialize and parse mutable data (so we don't rely on JSON, which has limits)

This commit is contained in:
Jude Nelson
2017-02-28 16:26:06 -05:00
parent 70c51389e1
commit ca9ef4324b

View File

@@ -513,6 +513,163 @@ def load_user_data_pubkey_addr( name, storage_drivers=None, proxy=None, config_p
return {'pubkey': data_pubkey, 'address': data_address}
def _data_blob_chomp( s ):
"""
Given "len(s):type:s,remainder", return (type, s, remainder)
"""
# grab length and remainder
parts = s.split(":", 1)
s_len = None
s_type = None
try:
s_len = int(parts[0])
assert parts[1][s_len] == ','
except:
raise ValueError("Invalid length field {} (from {})".format(parts[0], s))
type_payload = parts[1][:s_len]
s_remainder = ""
if s_len < len(parts[1]):
s_remainder = parts[1][s_len+1:]
# grab type and payload
parts = type_payload.split(':', 1)
s_type = parts[0]
try:
assert s_type in ['d', 's', 'i', 'l']
except:
raise ValueError("Invalid type field (from {})".format(s))
s_payload = parts[1]
return s_type, s_payload, s_remainder
def _data_blob_parse_work( data_blob_payload ):
"""
Parse a serialized data blob back into a structure
Return (parsed data, remainder)
"""
p_type, payload, remainder = _data_blob_chomp( data_blob_payload )
if p_type is None:
# empty
return '', ''
if p_type == 'i':
return int(payload), remainder
elif p_type == 's':
return str(payload), remainder
elif p_type in ['l', 'd']:
# payload is "blob,blob,blob..." string
parts = None
if p_type == 'l':
parts = []
else:
parts = {}
if len(payload) == 0:
# empty
return parts, remainder
if p_type == 'l':
# list
while True:
p_type, p_payload, p_remainder = _data_blob_chomp( payload )
# parse just this blob
part, r = _data_blob_parse_work('{}:{}:{},'.format(len(p_payload) + 2, p_type, p_payload))
assert len(r) == 0
parts.append(part)
if len(p_remainder) == 0:
break
payload = p_remainder
return parts, remainder
elif p_type == 'd':
# dict
while True:
k_type, k_payload, k_remainder = _data_blob_chomp( payload )
assert len(k_remainder) > 0, "dict underrun from '{}'".format(payload)
v_type, v_payload, v_remainder = _data_blob_chomp( k_remainder )
k_part, r = _data_blob_parse_work('{}:{}:{},'.format(len(k_payload) + 2, k_type, k_payload))
assert len(r) == 0
v_part, r = _data_blob_parse_work('{}:{}:{},'.format(len(v_payload) + 2, v_type, v_payload))
assert len(r) == 0
parts[k_part] = v_part
if len(v_remainder) == 0:
break
payload = v_remainder
return parts, remainder
else:
# unreachable
assert False
else:
raise ValueError("Invalid type {}".format(p_type))
def data_blob_parse( data_blob_payload ):
"""
Parse a serialized data structure
"""
data_blob, r = _data_blob_parse_work(data_blob_payload)
assert len(r) == 0, "Underrun while parsing"
return data_blob
def data_blob_serialize( data_blob ):
"""
Serialize a data blob (conformant to DATA_BLOB_SCHEMA) into a string
"""
if isinstance(data_blob, (int, long)):
data_blob = str(data_blob)
return '{}:i:{},'.format(len(data_blob) + 2, data_blob)
if isinstance(data_blob, (str, unicode)):
data_blob = str(data_blob)
return '{}:s:{},'.format(len(data_blob) + 2, data_blob)
if isinstance(data_blob, list):
data_blob_parts = [data_blob_serialize(x) for x in data_blob]
data_blob = ''.join(data_blob_parts)
data_blob = 'l:{}'.format(data_blob)
return '{}:{},'.format(len(data_blob), data_blob)
if isinstance(data_blob, dict):
payload_parts = []
for k in sorted(data_blob.keys()):
k_part = data_blob_serialize(k)
v_part = data_blob_serialize(data_blob[k])
payload_parts.append(k_part)
payload_parts.append(v_part)
values = ''.join(payload_parts)
payload = 'd:{}'.format(values)
return '{}:{},'.format(len(payload), payload)
raise ValueError('Unserializable type {}'.format(type(data_blob)))
def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None, data_hash=None, storage_drivers=None,
proxy=None, ver_min=None, ver_max=None, urls=None, device_ids=None, fully_qualified_data_id=False,
config_path=CONFIG_PATH, all_drivers=False):
@@ -589,12 +746,14 @@ def get_mutable(data_id, blockchain_id=None, data_pubkey=None, data_address=None
for driver in storage_drivers:
# get the mutable data itsef
data = storage.get_mutable_data(fq_data_id, data_pubkey, urls=urls, drivers=[driver], data_address=data_address, data_hash=data_hash, blockchain_id=blockchain_id)
if data is None:
data_str = storage.get_mutable_data(fq_data_id, data_pubkey, urls=urls, drivers=[driver], data_address=data_address, data_hash=data_hash, blockchain_id=blockchain_id)
if data_str is None:
log.error("Failed to get mutable datum {}".format(fq_data_id))
return {'error': 'Failed to look up mutable datum'}
data = None
try:
data = data_blob_parse(data_str)
jsonschema.validate(data, DATA_BLOB_SCHEMA)
except ValidationError as ve:
if BLOCKSTACK_DEBUG:
@@ -881,6 +1040,7 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
version = 1 if version is None else version + 1
# put the mutable data record itself
# TODO: make this a binary string
data_json = {
'fq_data_id': fq_data_id,
'data': data_payload,
@@ -891,10 +1051,11 @@ def put_mutable(data_id, data_payload, blockchain_id=None, data_privkey=None, pr
if blockchain_id is not None:
data_json['blockchain_id'] = blockchain_id
data = data_blob_serialize(data_json)
result = {}
log.debug("put_mutable({}, blockchain_id={}, lookup_privkey={}, version={}, storage_drivers={})".format(fq_data_id, blockchain_id, lookup, version, ','.join(storage_drivers)))
rc = storage.put_mutable_data(fq_data_id, data_json, data_privkey, blockchain_id=blockchain_id, required=storage_drivers)
rc = storage.put_mutable_data(fq_data_id, data, data_privkey, blockchain_id=blockchain_id, required=storage_drivers)
if not rc:
log.error("failed to put mutable data {}".format(fq_data_id))
result['error'] = 'Failed to store mutable data'
@@ -1219,7 +1380,6 @@ def _make_datastore_info( datastore_type, datastore_privkey_hex, driver_names, d
datastore_pubkey = get_pubkey_hex(datastore_privkey_hex)
datastore_id = keylib.public_key_to_address(datastore_pubkey)
datastore_root = _mutable_data_make_dir( datastore_id, root_uuid, {} )
datastore_root['idata'] = {}
assert datastore_type in ['datastore', 'collection'], datastore_type
@@ -1257,7 +1417,6 @@ def get_datastore( datastore_id, config_path=CONFIG_PATH, proxy=None):
return {'error': 'Failed to load public datastore record', 'errno': errno.ENOENT}
datastore = datastore_info['data']
try:
jsonschema.validate(datastore, DATASTORE_SCHEMA)
except (AssertionError, ValidationError) as ve:
@@ -1459,7 +1618,15 @@ def _get_inode(datastore_id, inode_uuid, inode_type, data_pubkey_hex, drivers, d
return {'error': 'Failed to find fresh inode'}
# success!
inode_info = res['data']
inode_info_str = res['data']
try:
inode_info = data_blob_parse(inode_info_str)
except:
if BLOCKSTACK_TEST:
log.error("Unparseable inode: {}".format(inode_info_str))
return {'error': 'Unparseable inode data'}
inode_version = res['version']
# must be an inode
@@ -1630,7 +1797,15 @@ def _get_inode_header(datastore_id, inode_uuid, data_pubkey_hex, drivers, device
return {'error': 'Failed to get inode data'}
# validate
inode_hdr = res['data']
inode_hdr_str = res['data']
try:
inode_hdr = data_blob_parse(inode_hdr_str)
except:
if BLOCKSTACK_TEST:
log.error("Unparseable header: {}".format(inode_hdr_str))
return {'error': "Unparseable inode header"}
inode_hdr_version = res['version']
inode_drivers = res['drivers']
@@ -1664,7 +1839,9 @@ def _put_inode(datastore_id, _inode, data_privkey, drivers, device_ids, config_p
# separate data from metadata.
# put metadata as a separate record.
data_id = '{}.{}'.format(datastore_id, _inode['uuid'])
res = put_mutable(data_id, _inode, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
inode_data = data_blob_serialize(_inode)
res = put_mutable(data_id, inode_data, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
if 'error' in res:
log.error("Failed to replicate inode {}: {}".format(_inode['uuid'], res['error']))
return {'error': 'Failed to replicate inode'}
@@ -1678,17 +1855,18 @@ def _put_inode(datastore_id, _inode, data_privkey, drivers, device_ids, config_p
# what will get_mutable() return?
inode_payload = {
'data': _inode,
'data': inode_data,
'version': inode_version,
'fq_data_id': res['fq_data_id'],
'timestamp': res['timestamp']
}
# put hash of inode payload
inode_hdr['data_hash'] = _mutable_data_inode_hash(inode_payload)
inode_hdr['data_hash'] = _mutable_data_inode_hash( data_blob_serialize(inode_payload) )
data_hdr_id = '{}.{}.hdr'.format(datastore_id, inode_hdr['uuid'])
res = put_mutable(data_hdr_id, inode_hdr, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
inode_hdr_data = data_blob_serialize(inode_hdr)
res = put_mutable(data_hdr_id, inode_hdr_data, data_privkey=data_privkey, storage_drivers=drivers, config_path=config_path, proxy=proxy, create=create )
if 'error' in res:
log.error("Failed to replicate inode header for {}: {}".format(inode['uuid'], res['error']))
return {'error': 'Failed to replicate inode header'}
@@ -1934,7 +2112,7 @@ def _mutable_data_make_file( data_address, inode_uuid, data_payload ):
Set up inode state for a file
"""
inode_state = _mutable_data_make_inode( MUTABLE_DATUM_FILE_TYPE, data_address, inode_uuid )
inode_state['idata'] = data_payload.encode('utf-8')
inode_state['idata'] = data_payload
return inode_state