okay, first pass at a naive subdomain resolver, with some test cases too

This commit is contained in:
Aaron Blankstein
2017-06-16 16:49:35 -04:00
parent 1a9c2f5d07
commit acf910d38c
2 changed files with 305 additions and 20 deletions

View File

@@ -23,17 +23,60 @@
import base64
import ecdsa, hashlib
import keylib
from itertools import izip
from blockstack_client import data, zonefile
from blockstack_client.logger import get_logger
log = get_logger()
class ParseError(Exception):
pass
def parse_zonefile_subdomains(zonefile_json):
registrar_urls = [ x for x in zonefile_json["uri"] if x["name"] == "registrar" ]
class SubdomainNotFound(Exception):
pass
subdomains = [ parse_subdomain_record(x) for x in zonefile_json["txt"]
if x["name"].startswith("_subd.") ]
class SubdomainNotFound(Exception):
pass
return registrar_urls, subdomains
# aaron: I was hesitant to write these two functions. But I did because:
# 1> getting the sign + verify functions from virtualchain.ecdsa
# was tricky because of the hashfunc getting lost in translating from
# SK to PK
# 2> didn't want this code to necessarily depend on virtualchain
def sign(sk, plaintext):
signer = ecdsa.SigningKey.from_pem(sk.to_pem())
blob = signer.sign_deterministic(plaintext, hashfunc = hashlib.sha256)
return base64.b64encode(blob)
def verify(pk, plaintext, sigb64):
signature = base64.b64decode(sigb64)
verifier = ecdsa.VerifyingKey.from_pem(pk.to_pem())
return verifier.verify(signature, plaintext, hashfunc = hashlib.sha256)
def parse_zonefile_subdomains(zonefile_json, with_packed=False):
if "uri" in zonefile_json:
registrar_urls = [ x for x in zonefile_json["uri"] if x["name"] == "registrar" ]
else:
registrar_urls = []
if "txt" in zonefile_json:
subdomains = [ (parse_subdomain_record(x), x["txt"]) for x in zonefile_json["txt"]
if x["name"].startswith("_subd.") ]
else:
subdomains = []
if len(subdomains) > 0:
parsed, packed = zip(*subdomains)
else:
parsed, packed = ([], [])
if with_packed:
return registrar_urls, parsed, packed
else:
return registrar_urls, parsed
def parse_subdomain_record(subdomain_record):
parsed = {}
@@ -67,9 +110,21 @@ def parse_subdomain_record(subdomain_record):
sig_found = True
elif datum_label == "url":
parsed["urls"].append(datum_entry)
for must_have in ["n", "pubkey"]:
if must_have not in parsed:
raise ParseError("Subdomain entry must have {} setting".format(must_have))
if parsed["n"] != 0 and "sig" not in parsed:
raise ParseError("Subdomain entries (with n>0) must have signature".format(must_have))
return parsed
def make_zonefile_entry(subdomain_name, packed_subdomain, as_dict=False):
d = { "name" : "_subd." + subdomain_name,
"txt" : packed_subdomain }
if as_dict:
return d
return '{} TXT "{}"'.format(d["name"], d["txt"])
def pack_and_sign_subdomain_record(subdomain_record, key):
entries = []
for k, v in subdomain_record.items():
@@ -77,8 +132,8 @@ def pack_and_sign_subdomain_record(subdomain_record, key):
raise ParseError("Don't use commas.")
if k in ["n", "pubkey"]:
entries.append((k,v))
if k == "url":
entries.append((k,v))
if k == "urls":
entries.extend([("url",value) for value in v])
plaintext = ",".join([ "{}:{}".format(k, v) for k,v in entries ])
@@ -95,7 +150,12 @@ def verify_subdomain_record(subdomain_record, prior_pubkey_entry):
pk_header, pk_data = decode_pubkey_entry(prior_pubkey_entry)
if pk_header == "echex":
return verify(keylib.ECPublicKey(pk_data), plaintext, sig)
try:
return verify(keylib.ECPublicKey(pk_data), plaintext, sig)
except ecdsa.BadSignatureError as e:
log.error("Signature verification failed with BadSignature {} over {} by {}".format(
sig, plaintext, pk_data))
return False
else:
raise NotImplementedError("PubKey type ({}) not supported".format(pk_header))
@@ -122,12 +182,100 @@ def encode_pubkey_entry(key):
return "data:{}:{}".format(head, data)
def sign(sk, plaintext):
signer = ecdsa.SigningKey.from_pem(sk.to_pem())
blob = signer.sign_deterministic(plaintext, hashfunc = hashlib.sha256)
return base64.b64encode(blob)
def is_a_subdomain(fqa):
"""
Tests whether fqa is a subdomain.
If it isn't, returns False.
If it is, returns True and a tuple (subdomain_name, domain)
"""
if re.match(schemas.OP_NAME_PATTERN, fqa) == None:
return False
pieces = fqa.split(".")
if len(pieces) == 3:
return (True, (pieces[0], ("{}.{}".format(*pieces[1:]))))
return False
def verify(pk, plaintext, sigb64):
signature = base64.b64decode(sigb64)
verifier = ecdsa.VerifyingKey.from_pem(pk.to_pem())
return verifier.verify(signature, plaintext, hashfunc = hashlib.sha256)
def _transition_valid(from_sub_record, to_sub_record, packed_sub_record):
if from_sub_record["n"] + 1 != to_sub_record["n"]:
log.warn("Failed subdomain {} transition because of N:{}->{}".format(
to_sub_record["name"], from_sub_record["n"], to_sub_record["n"]))
return False
if not verify_subdomain_record(packed_sub_record, from_sub_record["pubkey"]):
log.warn("Failed subdomain {} transition because of signature failure".format(
to_sub_record["name"], from_sub_record["n"], to_sub_record["n"]))
return False
parsed_again = parse_subdomain_record(
make_zonefile_entry(to_sub_record["name"], packed_sub_record, as_dict = True))
for (k,v) in parsed_again.items():
if k not in to_sub_record:
log.warn("Parsed version does not match packed version")
raise ParseError()
if v != to_sub_record[k]:
log.warn("Parsed version does not match packed version")
raise ParseError()
for (k,v) in to_sub_record.items():
if k not in parsed_again:
log.warn("Parsed version does not match packed version")
raise ParseError()
if v != parsed_again[k]:
log.warn("Parsed version does not match packed version")
raise ParseError()
return True
def _build_subdomain_db(domain_fqa, zonefiles):
subdomain_db = {}
for zf in zonefiles:
zf_json = zonefile.decode_name_zonefile(domain_fqa, zf)
_, subdomain_ops, subdomain_packs = parse_zonefile_subdomains(
zf_json, with_packed = True)
if len(subdomain_ops) < 1:
print zf
for subdomain_op, packed in izip(subdomain_ops, subdomain_packs):
if subdomain_op["name"] in subdomain_db:
previous = subdomain_db[subdomain_op["name"]]
if _transition_valid(previous, subdomain_op, packed):
new_rec = dict(subdomain_op)
del new_rec["sig"]
del new_rec["name"]
subdomain_db[subdomain_op["name"]] = new_rec
else:
log.warn("Failed subdomain transition for {}.{} on N:{}->{}".format(
subdomain_op["name"], domain_fqa, previous["n"], subdomain_op["n"]))
else:
if subdomain_op["n"] != 0:
log.warn("First sight of subdomain {}.{} with N={}".format(
subdomain_op["name"], domain_fqa, subdomain_op["n"]))
continue
new_rec = dict(subdomain_op)
if "sig" in new_rec:
del new_rec["sig"]
del new_rec["name"]
subdomain_db[subdomain_op["name"]] = new_rec
return subdomain_db
def resolve_subdomain(subdomain, domain_fqa):
# step 1: fetch domain zonefiles.
zonefiles = data.list_zonefile_history(domain_fqa)
# step 2: for each zonefile, parse the subdomain
# operations.
subdomain_db = _build_subdomain_db(domain_fqa, zonefiles)
# step 3: find the subdomain.
if not subdomain in subdomain_db:
raise SubdomainNotFound(subdomain)
my_rec = subdomain_db[subdomain]
# step 4: resolve!
pubkey_type, user_data_pubkey = decode_pubkey_entry(my_rec["pubkey"])
if pubkey_type != "echex":
raise NotImplementedError(
"Pubkey type {} for subdomain {}.{} not supported by resolver.".format(
pubkey_type, subdomain, domain_fqa))
user_profile = storage.get_mutable_data(
None, user_data_pubkey, blockchain_id=None,
data_address=None, owner_address=None,
urls=urls, drivers=None, decode=True,
)

View File

@@ -102,22 +102,159 @@ _subd.foo TXT "pubkey:data:echex:00000000000000000000000000000000000000000000000
"pubkey" : subdomains.encode_pubkey_entry(sk),
"name" : "foo",
"n" : 3,
"url":"https://foobar.com/profile",
"url":"https://dropbox.com/profile2"
"urls": ["https://foobar.com/profile",
"https://dropbox.com/profile2"]
}
packed_subdomain_record = subdomains.pack_and_sign_subdomain_record(subd_json, sk)
self.assertTrue(
subdomains.verify_subdomain_record(packed_subdomain_record,
subdomains.verify_subdomain_record(packed_subdomain_record,
subdomains.encode_pubkey_entry(sk)))
self.assertTrue(
subdomains.verify_subdomain_record(packed_subdomain_record,
subdomains.verify_subdomain_record(packed_subdomain_record,
subdomains.encode_pubkey_entry(pk)))
self.assertRaises( NotImplementedError, lambda : subdomains.encode_pubkey_entry( fake_privkey_hex ) )
self.assertRaises( NotImplementedError,
lambda : subdomains.verify_subdomain_record(packed_subdomain_record,
"data:pem:000"))
def test_signed_transition(self):
fake_privkey_hex = "5512612ed6ef10ea8c5f9839c63f62107c73db7306b98588a46d0cd2c3d15ea5"
sk = keylib.ECPrivateKey(fake_privkey_hex)
pk = sk.public_key()
start_json = {
"pubkey" : subdomains.encode_pubkey_entry(sk),
"n" : 3,
"urls":["https://foobar.com/profile",
"https://dropbox.com/profile2"]
}
next_json = {
"pubkey" : "data:echex:0",
"n" : 4,
"name" : "foo",
"urls": ["https://none.com"]
}
packed_record_next = subdomains.pack_and_sign_subdomain_record(next_json, sk)
parsed_record_next = subdomains.parse_subdomain_record(
subdomains.make_zonefile_entry("foo", packed_record_next, as_dict=True))
self.assertTrue(
subdomains._transition_valid(start_json, parsed_record_next, packed_record_next))
next_json["urls"] = ["https://different.com"]
packed_record_next_fail = subdomains.pack_and_sign_subdomain_record(next_json, sk)
parsed_record_next_fail = subdomains.parse_subdomain_record(
subdomains.make_zonefile_entry("foo", packed_record_next_fail, as_dict=True))
self.assertRaises(subdomains.ParseError,
lambda : subdomains._transition_valid(start_json, parsed_record_next_fail, packed_record_next))
next_json["n"] = "5"
packed_record_next_fail = subdomains.pack_and_sign_subdomain_record(next_json, sk)
parsed_record_next_fail = subdomains.parse_subdomain_record(
subdomains.make_zonefile_entry("foo", packed_record_next_fail, as_dict=True))
self.assertFalse(
subdomains._transition_valid(start_json, parsed_record_next_fail, packed_record_next_fail))
next_json["n"] = "4"
packed_record_next_good = subdomains.pack_and_sign_subdomain_record(next_json, sk)
parsed_record_next_good = subdomains.parse_subdomain_record(
subdomains.make_zonefile_entry("foo", packed_record_next_good, as_dict=True))
self.assertTrue(
subdomains._transition_valid(start_json, parsed_record_next_good, packed_record_next_good))
sk_bad = keylib.ECPrivateKey()
packed_record_next_fail = subdomains.pack_and_sign_subdomain_record(next_json, sk_bad)
parsed_record_next_fail = subdomains.parse_subdomain_record(
subdomains.make_zonefile_entry("foo", packed_record_next_fail, as_dict=True))
self.assertFalse(
subdomains._transition_valid(start_json, parsed_record_next_fail, packed_record_next_fail))
def test_db_builder(self):
history = [
"""$ORIGIN bar.id
$TTL 3600
pubkey TXT "pubkey:data:0"
registrar URI 10 1 "bsreg://foo.com:8234"
_subd.foo TXT "pubkey:{},N:0,url:https://foobar.com/profile,url:https://dropbox.com/profile2"
""",
"""$ORIGIN bar.id
$TTL 3600
pubkey TXT "pubkey:data:0"
registrar URI 10 1 "bsreg://foo.com:8234"
_subd.bar TXT "pubkey:{},N:0,url:https://foobar.com/profile,url:https://dropbox.com/profile2"
""",
"""$ORIGIN bar.id
$TTL 3600
pubkey TXT "pubkey:data:0"
registrar URI 10 1 "bsreg://foo.com:8234"
{}
""",
"""$ORIGIN bar.id
$TTL 3600
pubkey TXT "pubkey:data:0"
registrar URI 10 1 "bsreg://foo.com:8234"
{}
"""]
foo_bar_sk = keylib.ECPrivateKey()
bar_bar_sk = keylib.ECPrivateKey()
history[0] = history[0].format(subdomains.encode_pubkey_entry(foo_bar_sk))
history[1] = history[1].format(subdomains.encode_pubkey_entry(bar_bar_sk))
history[2] = history[2].format(
subdomains.make_zonefile_entry(
"bar",
subdomains.pack_and_sign_subdomain_record(
{"pubkey" : subdomains.encode_pubkey_entry(bar_bar_sk),
"n" : 1,
"urls" : ["https://foobar.com", "https://noodles.com"]},
bar_bar_sk)))
history[3] = history[3].format(
subdomains.make_zonefile_entry(
"foo",
subdomains.pack_and_sign_subdomain_record(
{"pubkey" : subdomains.encode_pubkey_entry(foo_bar_sk),
"n" : 1,
"urls" : ["https://foobar.com", "https://poodles.com"]},
foo_bar_sk)))
subdomain_db = subdomains._build_subdomain_db("bar.id", history[:1])
self.assertEqual(subdomain_db["foo"]["n"], 0)
self.assertIn("https://foobar.com/profile", subdomain_db["foo"]["urls"])
self.assertIn("https://dropbox.com/profile2", subdomain_db["foo"]["urls"])
self.assertNotIn("bar", subdomain_db)
subdomain_db = subdomains._build_subdomain_db("bar.id", history[:2])
self.assertIn("bar", subdomain_db)
self.assertEqual(subdomain_db["bar"]["n"], 0)
self.assertIn("https://foobar.com/profile", subdomain_db["bar"]["urls"])
self.assertIn("https://dropbox.com/profile2", subdomain_db["bar"]["urls"])
subdomain_db = subdomains._build_subdomain_db("bar.id", history[:3])
self.assertEqual(subdomain_db["foo"]["n"], 0)
self.assertEqual(subdomain_db["bar"]["n"], 1)
self.assertIn("https://foobar.com/profile", subdomain_db["foo"]["urls"])
self.assertIn("https://dropbox.com/profile2", subdomain_db["foo"]["urls"])
self.assertNotIn("https://foobar.com/profile", subdomain_db["bar"]["urls"])
self.assertNotIn("https://dropbox.com/profile2", subdomain_db["bar"]["urls"])
subdomain_db = subdomains._build_subdomain_db("bar.id", history)
self.assertEqual(subdomain_db["foo"]["n"], 1)
self.assertEqual(subdomain_db["bar"]["n"], 1)
self.assertNotIn("https://foobar.com/profile", subdomain_db["foo"]["urls"])
self.assertNotIn("https://dropbox.com/profile2", subdomain_db["foo"]["urls"])
self.assertIn("https://foobar.com", subdomain_db["bar"]["urls"])
self.assertIn("https://noodles.com", subdomain_db["bar"]["urls"])
self.assertIn("https://foobar.com", subdomain_db["foo"]["urls"])
self.assertIn("https://poodles.com", subdomain_db["foo"]["urls"])
def lets_resolve(self):
pass
if __name__ == '__main__':
unittest.main()