From 75694efdef589b2546900855da824ebf8bbdaefa Mon Sep 17 00:00:00 2001 From: Muneeb Ali Date: Sat, 12 Jul 2014 23:32:07 -0700 Subject: [PATCH] -- Added multi-threaded namecoind to registration daemon and fixed corner-cases where redundant name_updates were getting issued -- Cleaned up code --- .../blockdata/activate.py | 56 +++--- .../blockdata/namecoind_cluster.py | 14 +- .../blockdata/register.py | 180 +++++++++--------- .../blockstack_registrar/common.py | 82 ++++++++ .../blockstack_registrar/config.py | 6 +- .../blockstack_registrar/register_daemon.py | 16 -- 6 files changed, 206 insertions(+), 148 deletions(-) create mode 100644 blockstack_cli_0.14.1/blockstack_registrar/common.py diff --git a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/activate.py b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/activate.py index e37649818..2b72ee598 100755 --- a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/activate.py +++ b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/activate.py @@ -5,73 +5,63 @@ # All Rights Reserved #----------------------- -import os -import json -import requests +from coinrpc.namecoin.namecoind_server import NamecoindServer -from time import sleep -from coinrpc.namecoin.namecoind_wrapper import namecoind_blocks, namecoind_firstupdate +from config import NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, WALLET_PASSPHRASE, NAMECOIND_USE_HTTPS +from config import MAIN_SERVER, LOAD_SERVERS -from pymongo import MongoClient -client = MongoClient() -db = client['namecoin'] -queue = db.queue +namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE) -LOAD_BALANCER = os.environ['LOAD_BALANCER'] +from common import log, get_string +from common import register_queue -blocks = namecoind_blocks() +blocks = namecoind.blocks() #----------------------------------- def do_name_firstupdate(): #remove entries that are already active - queue.remove({"activated":True}) + register_queue.remove({"activated":True}) - print "Checking for new activations" - print '-' * 5 + log.debug("Checking for new activations") + log.debug('-' * 5) - for entry in queue.find(): + for entry in register_queue.find(): #entry is registered; but not activated if entry.get('activated') is not None and entry.get('activated') == False: - #print "Processing: " + entry['key'] + key = entry['key'] #compare the current block with 'wait_till_block' current_blocks = blocks['blocks'] - if current_blocks > entry['wait_till_block'] and entry['backend_server'] == int(LOAD_BALANCER): - #lets activate the entry - print "Activating: " + entry['key'] + if current_blocks > entry['wait_till_block']: - #check if 'value' is a json or not - try: - update_value = json.loads(entry['value']) - update_value = json.dumps(update_value) #no error while parsing; dump into json again - except: - update_value = entry['value'] #error: treat it as a string + update_value = get_string(entry['value']) + + log.debug("Activating entry: '%s' to point to '%s'" % (key, update_value)) - print "Activating entry: '%s' to point to '%s'" % (entry['key'], update_value) - - output = namecoind_firstupdate(entry['key'],entry['rand'],update_value,entry['longhex']) + namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE) - print "Transaction ID ", output + output = namecoind.firstupdate(key,entry['rand'],update_value,entry['longhex']) + log.debug("tx: %s", output) if 'message' in output and output['message'] == "this name is already active": entry['activated'] = True elif 'code' in output: entry['activated'] = False - print "Not activated. Try again." + log.debug("Not activated. Try again.") else: entry['activated'] = True entry['tx_id'] = output - queue.save(entry) + register_queue.save(entry) - print '----' + log.debug('-' * 5) else: - print "wait: " + str(entry['wait_till_block'] - current_blocks) + " blocks for: " + entry['key'] + log.debug("wait: %s block for: %s" % ((entry['wait_till_block'] - current_blocks + 1), entry['key'])) #----------------------------------- if __name__ == '__main__': diff --git a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/namecoind_cluster.py b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/namecoind_cluster.py index 1e74e9f8c..6238c0148 100755 --- a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/namecoind_cluster.py +++ b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/namecoind_cluster.py @@ -15,13 +15,15 @@ from config import MAIN_SERVER, LOAD_SERVERS from multiprocessing.pool import ThreadPool +reply = {} +reply["registered"] = False +reply["server"] = None +reply["ismine"] = False + #----------------------------------- def check_address(address): - reply = {} - - reply['ismine'] = False - reply['server'] = None + reply['registered'] = True #-------------------------- def check_address_inner(server): @@ -34,8 +36,8 @@ def check_address(address): return if info['ismine'] is True: - reply['ismine'] = True reply['server'] = server + reply['ismine'] = True #first check the main server check_address_inner(MAIN_SERVER) @@ -62,4 +64,4 @@ def get_server(key): if 'namecoin_address' in info: return check_address(info['namecoin_address']) else: - return json.dumps({}) + return reply diff --git a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/register.py b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/register.py index 9d0e623d0..8da3e0a41 100755 --- a/blockstack_cli_0.14.1/blockstack_registrar/blockdata/register.py +++ b/blockstack_cli_0.14.1/blockstack_registrar/blockdata/register.py @@ -6,66 +6,96 @@ #----------------------- #520 is the real limit +#hardcoded here instead of some config file VALUE_MAX_LIMIT = 512 -import requests import json -from coinrpc.namecoin.namecoind_wrapper import namecoind_blocks, namecoind_name_new, check_registration -from coinrpc.namecoin.namecoind_wrapper import namecoind_name_update, namecoind_name_show +from common import utf8len, log +from common import users, register_queue -from config import LOAD_BALANCER +from coinrpc.namecoin.namecoind_server import NamecoindServer +from blockdata.namecoind_cluster import get_server +from config import NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, WALLET_PASSPHRASE, NAMECOIND_USE_HTTPS +from config import MAIN_SERVER, LOAD_SERVERS +from config import DEFAULT_HOST, MEMCACHED_PORT, MEMCACHED_TIMEOUT + +namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE) + +import pylibmc +from time import time +mc = pylibmc.Client([DEFAULT_HOST + ':' + MEMCACHED_PORT],binary=True) + #----------------------------------- -from pymongo import MongoClient -client = MongoClient() -local_db = client['namecoin'] -queue = local_db.queue +def register_name(key,value,server=MAIN_SERVER): -from config import MONGODB_URI -remote_client = MongoClient(MONGODB_URI) -remote_db = remote_client.get_default_database() -users = remote_db.user -registrations = remote_db.user_registration - -#----------------------------------- -def utf8len(s): - - if type(s) == unicode: - return len(s) - else: - return len(s.encode('utf-8')) - -#----------------------------------- -def save_name_new_info(info,key,value): reply = {} - - try: + + #check if already in register queue (name_new) + check_queue = register_queue.find_one({"key":key}) + + if check_queue is not None: + reply['message'] = "ERROR: " + "already in register queue: " + str(key) + else: + + namecoind = NamecoindServer(server, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE) + + info = namecoind.name_new(key,json.dumps(value)) + + try: - reply['longhex'] = info[0] - reply['rand'] = info[1] - reply['key'] = key - reply['value'] = value - reply['backend_server'] = int(LOAD_BALANCER) + reply['longhex'] = info[0] + reply['rand'] = info[1] + reply['key'] = key + reply['value'] = json.dumps(value) - #get current block... - blocks = namecoind_blocks() + #get current block... + blocks = namecoind.blocks() - reply['current_block'] = blocks['blocks'] - reply['wait_till_block'] = blocks['blocks'] + 12 - reply['activated'] = False + reply['current_block'] = blocks['blocks'] + reply['wait_till_block'] = blocks['blocks'] + 12 + reply['activated'] = False + reply['server'] = server - #save this data to Mongodb... - queue.insert(reply) + #save this data to Mongodb... + register_queue.insert(reply) - reply['message'] = 'Your registration will be completed in roughly two hours' - del reply['_id'] #reply[_id] is causing a json encode error + #reply[_id] is causing a json encode error + del reply['_id'] - except Exception as e: - reply['message'] = "ERROR: " + str(e) + except Exception as e: + reply['message'] = "ERROR: " + str(e) + log.debug(reply) + log.debug('-' * 5) + return reply +#----------------------------------- +def update_name(key,value): + + reply = {} + + cache_reply = mc.get("name_update_" + str(key)) + + if cache_reply is None: + + server = get_server(key) + namecoind = NamecoindServer(server, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE) + + info = namecoind.name_update(key,json.dumps(value)) + + log.debug(value) + reply['tx'] = info + + mc.set("name_update_" + str(key),"in_memory",int(time() + MEMCACHED_TIMEOUT)) + else: + reply['message'] = "ERROR: " + "recently sent name_update: " + str(key) + + log.debug(reply) + log.debug('-' * 5) + #----------------------------------- def slice_profile(username, profile, old_keys=None): @@ -120,47 +150,13 @@ def slice_profile(username, profile, old_keys=None): return keys, values -#----------------------------------- -def register_name(key,value): - - info = namecoind_name_new(key,json.dumps(value)) - - reply = save_name_new_info(info,key,json.dumps(value)) - - print reply - print '---' - -#----------------------------------- -def update_name(key,value): - - reply = {} - - info = namecoind_name_update(key,json.dumps(value)) - - reply['key'] = key - reply['value'] = value - reply['activated'] = True - reply['backend_server'] = int(LOAD_BALANCER) - - #save this data to Mongodb... - check = queue.find_one({'key':key}) - - if check is None: - queue.insert(reply) - else: - queue.save(reply) - - print reply - print info - print '---' - #---------------------------------- def get_old_keys(username): #---------------------------------- def get_next_key(key): - check_profile = namecoind_name_show(key) + check_profile = namecoind.name_show(key) try: check_profile = check_profile['value'] @@ -187,7 +183,7 @@ def get_old_keys(username): return old_keys #----------------------------------- -def process_user(username,profile): +def process_user(username,profile,server=MAIN_SERVER): #old_keys = get_old_keys(username) @@ -197,23 +193,23 @@ def process_user(username,profile): key1 = keys[index] value1 = values[index] - print utf8len(json.dumps(value1)) - - if check_registration(key1): + if namecoind.check_registration(key1): #if name is registered - print "name update: " + key1 + log.debug("name update: %s", key1) + log.debug("size: %s", utf8len(json.dumps(value1))) update_name(key1,value1) else: #if not registered - print "name new: " + key1 - register_name(key1,value1) + log.debug("name new: %s", key1) + log.debug("size: %s", utf8len(json.dumps(value1))) + register_name(key1,value1,server) - process_additional_keys(keys, values) + process_additional_keys(keys, values,server) #----------------------------------- -def process_additional_keys(keys,values): +def process_additional_keys(keys,values,server): #register/update remaining keys size = len(keys) @@ -222,13 +218,13 @@ def process_additional_keys(keys,values): next_key = keys[index] next_value = values[index] - if check_registration(next_key): - print "name update: " + next_key - print utf8len(json.dumps(next_value)) + log.debug(utf8len(json.dumps(next_value))) + + if namecoind.check_registration(next_key): + log.debug("name update: " + next_key) update_name(next_key,next_value) else: - print "name new: " + next_key - print utf8len(json.dumps(next_value)) - register_name(next_key,next_value) + log.debug("name new: " + next_key) + register_name(next_key,next_value,server) index += 1 diff --git a/blockstack_cli_0.14.1/blockstack_registrar/common.py b/blockstack_cli_0.14.1/blockstack_registrar/common.py new file mode 100644 index 000000000..58aed04ea --- /dev/null +++ b/blockstack_cli_0.14.1/blockstack_registrar/common.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +#----------------------- +# Copyright 2013 Halfmoon Labs, Inc. +# All Rights Reserved +#----------------------- + +''' + This file contains common code +''' + +from config import DEBUG +import logging +import json + +#----------------------------------- +from pymongo import MongoClient +client = MongoClient() +local_db = client['namecoin'] +register_queue = local_db.queue + +from config import MONGODB_URI +remote_client = MongoClient(MONGODB_URI) +remote_db = remote_client.get_default_database() +users = remote_db.user + +#------------------------- +def get_logger(): + + if(DEBUG): + log = logging.getLogger() + log.setLevel(logging.DEBUG) + + formatter = logging.Formatter('[%(levelname)s] %(message)s') + handler_stream = logging.StreamHandler() + handler_stream.setFormatter(formatter) + log.addHandler(handler_stream) + + else: + log = None + + return log + +#------------------------- +#common logger +log = get_logger() + +#------------------------- +def pretty_dump(input): + + return json.dumps(input, cls=MongoEncoder, sort_keys=False, indent=4, separators=(',', ': ')) + +#------------------------- +def pretty_print(input): + log.debug(pretty_dump(input)) + +#----------------------------------- +def utf8len(s): + + if type(s) == unicode: + return len(s) + else: + return len(s.encode('utf-8')) + +#----------------------------------------- +def get_json(data): + + if isinstance(data,dict): + pass + else: + data = json.loads(data) + + return data + +#----------------------------------------- +def get_string(data): + + if isinstance(data,dict): + data = json.dumps(data) + else: + pass + + return data \ No newline at end of file diff --git a/blockstack_cli_0.14.1/blockstack_registrar/config.py b/blockstack_cli_0.14.1/blockstack_registrar/config.py index 4da883cce..b67febf3a 100644 --- a/blockstack_cli_0.14.1/blockstack_registrar/config.py +++ b/blockstack_cli_0.14.1/blockstack_registrar/config.py @@ -31,4 +31,8 @@ except: MONGODB_URI = os.environ['MONGODB_URI'] OLD_DB = os.environ['OLD_DB'] - LOAD_BALANCER = os.environ['LOAD_BALANCER'] \ No newline at end of file + LOAD_BALANCER = os.environ['LOAD_BALANCER'] + + DEFAULT_HOST = '127.0.0.1' + MEMCACHED_PORT = '11211' + MEMCACHED_TIMEOUT = 15 * 60 \ No newline at end of file diff --git a/blockstack_cli_0.14.1/blockstack_registrar/register_daemon.py b/blockstack_cli_0.14.1/blockstack_registrar/register_daemon.py index f1a865b1a..55e3c2902 100755 --- a/blockstack_cli_0.14.1/blockstack_registrar/register_daemon.py +++ b/blockstack_cli_0.14.1/blockstack_registrar/register_daemon.py @@ -37,8 +37,6 @@ old_users = old_db.user local_client = MongoClient() local_db = local_client['namecoin'] -queue_register = local_db.queue -queue_update = local_db.queue_update problem_users = ['madmoneymachine', 'drmox', 'emiljohansson','xfaure','megaz28','maxweiss','kh','patrickcines'] @@ -48,20 +46,6 @@ def process_profile(username,profile): if username in problem_users: return - #check if already in register queue (name_new) - check_queue = queue_register.find_one({"key":'u/' + username}) - - if check_queue is not None: - print "Already in register queue: " + str(username) - return - - #check if already in update queue (name_update) - check_queue = queue_update.find_one({"key":'u/' + username}) - - if check_queue is not None: - print "Already in update queue: " + str(username) - return - #check if load-balancer is correct old_user = old_users.find_one({"username":username})