We no longer need user objects for datastores to work properly. Remove now-useless code

This commit is contained in:
Jude Nelson
2017-02-20 21:01:20 -05:00
parent bf8d43b537
commit 17588ebf27

View File

@@ -34,315 +34,12 @@ import re
import keylib
from .schemas import *
from .constants import BLOCKSTACK_TEST, CONFIG_PATH, BLOCKSTACK_DEBUG, USER_DIRNAME, USER_GLOBAL_SIGNING_KEY_INDEX, USER_LOCAL_SIGNING_KEY_INDEX
from .constants import BLOCKSTACK_TEST, CONFIG_PATH, BLOCKSTACK_DEBUG
from .keys import HDWallet, get_pubkey_hex
import scripts
log = config.get_logger()
USER_CACHE = {}
def user_dir(config_path=CONFIG_PATH):
"""
Get the path to the directory
that stores user information (like private keys).
"""
conf = config.get_config(path=config_path)
assert conf
dirp = conf['users']
if posixpath.normpath(os.path.abspath(dirp)) != posixpath.normpath(conf['users']):
# relative path; make absolute
dirp = posixpath.normpath( os.path.join(os.path.dirname(config_path), dirp) )
return dirp
def user_name(user_id):
"""
Get the on-disk name of a file that stores the user information
"""
return "{}.user".format(user_id.replace('/', '\\x2f'))
def user_path(user_id, config_path=CONFIG_PATH):
"""
Get the path to a user account state bundle
"""
dirp = user_dir(config_path=config_path)
return os.path.join(dirp, user_name(user_id))
def get_user_master_privkey( master_data_privkey, is_global=True, config_path=CONFIG_PATH ):
"""
Get the key we use to derive user keys
hdpath is MASTER_PRIVKEY/USER_GLOBAL_SIGNING_KEY_INDEX'/0' for global users
"""
parent_index = None
if is_global:
parent_index = USER_GLOBAL_SIGNING_KEY_INDEX
else:
parent_index = USER_LOCAL_SIGNING_KEY_INDEX
hdwallet_parent = HDWallet( hex_privkey=master_data_privkey, config_path=config_path )
master_user_privkey_parent = hdwallet_parent.get_child_privkey( index=parent_index )
hdwallet = HDWallet( hex_privkey=master_user_privkey_parent, config_path=config_path )
master_user_privkey = hdwallet.get_child_privkey( index=0 )
return master_user_privkey
def user_init( user_id, master_data_privkey_hex, is_global, blockchain_id=None, config_path=CONFIG_PATH ):
"""
Generate a new local user with the given user ID
Returns {'user': ..., 'user_token': ...} on success
Returns {'error': ... on error}
raises on fatal error
"""
privkey_index = None
from .data import next_privkey_index
next_privkey_index_info = next_privkey_index(master_data_privkey_hex, is_global,
blockchain_id=blockchain_id,
config_path=config_path)
if 'error' in next_privkey_index_info:
return next_privkey_index_info
privkey_index = next_privkey_index_info['index']
user_master_privkey = get_user_master_privkey( master_data_privkey_hex, config_path=config_path )
hdwallet = HDWallet( hex_privkey=user_master_privkey, config_path=config_path )
user_privkey = hdwallet.get_child_privkey( index=privkey_index )
info = {
'user_id': user_id,
'public_key': get_pubkey_hex(user_privkey),
'privkey_index': privkey_index,
'global': is_global,
}
if blockchain_id is not None:
info['blockchain_id'] = blockchain_id
res = user_serialize(info, user_master_privkey, config_path=config_path)
if 'error' in res:
return res
token = res['token']
return {'user': info, 'user_token': token}
def user_serialize( user_info, master_data_privkey_hex, config_path=CONFIG_PATH ):
"""
Sign and serialize a user into a JWT
Return {'status': True, 'token': ...} on success
Return {'error': ...} on failure
"""
try:
jsonschema.validate(user_info, USER_SCHEMA)
except ValidationError:
return {'error': 'Not a valid user'}
user_master_privkey = get_user_master_privkey( master_data_privkey_hex, config_path=config_path )
signer = jsontokens.TokenSigner()
token = signer.sign(user_info, user_master_privkey)
return {'status': True, 'token': token}
def user_store( token, config_path=CONFIG_PATH ):
"""
Store the user data locally.
@token must be a JWT encoded user data token
Verify it conforms to USER_SCHEMA
Returns {'status': True} on success
Returns {'error': ...} on error
"""
global USER_CACHE
# verify that this is a well-formed user
jwt = jsontokens.decode_token(token)
payload = jwt['payload']
jsonschema.validate(payload, USER_SCHEMA)
# store locally
user_id = payload['user_id']
path = user_path( user_id, config_path=config_path)
try:
pathdir = os.path.dirname(path)
if not os.path.exists(pathdir):
os.makedirs(pathdir)
with open(path, "w") as f:
f.write(token)
except:
log.error("Failed to store user {}".format(path))
return {'error': 'Failed to store user'}
name = user_name(user_id)
if USER_CACHE.has_key(name):
del USER_CACHE[name]
return {'status': True}
def user_delete( user_id, config_path=CONFIG_PATH ):
"""
Delete a user
Return {'status': True} on success
"""
global USER_CACHE
path = user_path(user_id, config_path=config_path)
if not os.path.exists(path):
return {'error': 'No such user'}
log.debug("delete user {} ({})".format(user_id, path))
try:
os.unlink(path)
except Exception, e:
log.exception(e)
return {'error': 'Failed to unlink'}
name = user_name(user_id)
if USER_CACHE.has_key(name):
del USER_CACHE[name]
return {'status': True}
def user_verify(user_jwt, data_pubkey_hex):
"""
Verify a user token with the given public key
Return True if valid
Return False if not
"""
verifier = jsontokens.TokenVerifier()
valid = verifier.verify( user_jwt, str(data_pubkey_hex) )
return valid
def user_parse(user_jwt):
"""
Parse and validate a user token
Return {'status': True, 'user': ...} on success
Return {'error': ...} on failure
"""
try:
data = jsontokens.decode_token(user_jwt)
jsonschema.validate(data['payload'], USER_SCHEMA)
return {'status': True, 'user': data['payload']}
except (ValueError, ValidationError) as ve:
return {'error': 'Failed to parse and validate'}
def user_is_local(user_id, config_path=CONFIG_PATH):
"""
Is a user owned by the local host?
"""
path = user_path( user_id, config_path=config_path)
return os.path.exists(path)
def _user_load_path(path, data_pubkey_hex, config_path=CONFIG_PATH):
"""
Load a user from a given path
Verify it conforms to the USER_SCHEMA, and (optionally) that it was signed by the given public key
Return {'user': ..., 'user_token': ...} on success
Return {'error': ...} on error
"""
jwt = None
try:
with open(path, "r") as f:
jwt = f.read()
except:
log.error("Failed to load {}".format(path))
return {'error': 'Failed to read user'}
# verify
if data_pubkey_hex is not None:
valid = user_verify(jwt, data_pubkey_hex)
if not valid:
return {'error': 'Failed to verify user JWT data'}
data = jsontokens.decode_token( jwt )
jsonschema.validate(data['payload'], USER_SCHEMA)
return {'user': data['payload'], 'user_token': jwt}
def user_load( user_id, master_data_privkey, config_path=CONFIG_PATH):
"""
Load the app account for the given (user_id, app owner name, appname) triple
Return {'user': jwt, 'user_token': token} on success
Return {'error': ...} on error
"""
global USER_CACHE
user_master_privkey = get_user_master_privkey( master_data_privkey, config_path=config_path )
user_master_pubkey = get_pubkey_hex(user_master_privkey)
name = user_name(user_id)
if USER_CACHE.has_key(name):
log.debug("User {} is cached".format(name))
return USER_CACHE[name]
path = user_path( user_id, config_path=config_path)
res = _user_load_path( path, user_master_pubkey, config_path=config_path )
if 'error' in res:
return res
USER_CACHE[name] = res
return res
def users_list(master_data_privkey, config_path=CONFIG_PATH):
"""
Get the list of all users
Return a list of USER_SCHEMA-formatted objects
"""
user_master_privkey = get_user_master_privkey(master_data_privkey, config_path=config_path)
user_master_pubkey = get_pubkey_hex(user_master_privkey)
dirp = user_dir(config_path=config_path)
if not os.path.exists(dirp) or not os.path.isdir(dirp):
log.error("No user directory")
return []
names = os.listdir(dirp)
names = filter(lambda n: n.endswith(".user"), names)
ret = []
for name in names:
path = os.path.join( dirp, name )
info = _user_load_path( path, user_master_pubkey, config_path=config_path )
if 'error' in info:
continue
ret.append(info['user'])
return ret
def user_get_privkey( master_privkey_hex, user_info, config_path=CONFIG_PATH ):
"""
Given the master data private key and a user structure, calculate the private key
for the user.
Return the private key
"""
user_master_privkey = get_user_master_privkey(master_privkey_hex, config_path=config_path)
user_privkey = HDWallet.get_privkey(user_master_privkey, user_info['privkey_index'])
return user_privkey
def is_user_zonefile(d):
"""
Is the given dict (or dict-like object) a user zonefile?