-- Added multi-threaded namecoind to registration daemon and fixed corner-cases where redundant name_updates were getting issued

-- Cleaned up code
This commit is contained in:
Muneeb Ali
2014-07-12 23:32:07 -07:00
parent 2d6520af0e
commit 75694efdef
6 changed files with 206 additions and 148 deletions

View File

@@ -5,73 +5,63 @@
# All Rights Reserved
#-----------------------
import os
import json
import requests
from coinrpc.namecoin.namecoind_server import NamecoindServer
from time import sleep
from coinrpc.namecoin.namecoind_wrapper import namecoind_blocks, namecoind_firstupdate
from config import NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, WALLET_PASSPHRASE, NAMECOIND_USE_HTTPS
from config import MAIN_SERVER, LOAD_SERVERS
from pymongo import MongoClient
client = MongoClient()
db = client['namecoin']
queue = db.queue
namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE)
LOAD_BALANCER = os.environ['LOAD_BALANCER']
from common import log, get_string
from common import register_queue
blocks = namecoind_blocks()
blocks = namecoind.blocks()
#-----------------------------------
def do_name_firstupdate():
#remove entries that are already active
queue.remove({"activated":True})
register_queue.remove({"activated":True})
print "Checking for new activations"
print '-' * 5
log.debug("Checking for new activations")
log.debug('-' * 5)
for entry in queue.find():
for entry in register_queue.find():
#entry is registered; but not activated
if entry.get('activated') is not None and entry.get('activated') == False:
#print "Processing: " + entry['key']
key = entry['key']
#compare the current block with 'wait_till_block'
current_blocks = blocks['blocks']
if current_blocks > entry['wait_till_block'] and entry['backend_server'] == int(LOAD_BALANCER):
#lets activate the entry
print "Activating: " + entry['key']
if current_blocks > entry['wait_till_block']:
#check if 'value' is a json or not
try:
update_value = json.loads(entry['value'])
update_value = json.dumps(update_value) #no error while parsing; dump into json again
except:
update_value = entry['value'] #error: treat it as a string
update_value = get_string(entry['value'])
log.debug("Activating entry: '%s' to point to '%s'" % (key, update_value))
print "Activating entry: '%s' to point to '%s'" % (entry['key'], update_value)
output = namecoind_firstupdate(entry['key'],entry['rand'],update_value,entry['longhex'])
namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE)
print "Transaction ID ", output
output = namecoind.firstupdate(key,entry['rand'],update_value,entry['longhex'])
log.debug("tx: %s", output)
if 'message' in output and output['message'] == "this name is already active":
entry['activated'] = True
elif 'code' in output:
entry['activated'] = False
print "Not activated. Try again."
log.debug("Not activated. Try again.")
else:
entry['activated'] = True
entry['tx_id'] = output
queue.save(entry)
register_queue.save(entry)
print '----'
log.debug('-' * 5)
else:
print "wait: " + str(entry['wait_till_block'] - current_blocks) + " blocks for: " + entry['key']
log.debug("wait: %s block for: %s" % ((entry['wait_till_block'] - current_blocks + 1), entry['key']))
#-----------------------------------
if __name__ == '__main__':

View File

@@ -15,13 +15,15 @@ from config import MAIN_SERVER, LOAD_SERVERS
from multiprocessing.pool import ThreadPool
reply = {}
reply["registered"] = False
reply["server"] = None
reply["ismine"] = False
#-----------------------------------
def check_address(address):
reply = {}
reply['ismine'] = False
reply['server'] = None
reply['registered'] = True
#--------------------------
def check_address_inner(server):
@@ -34,8 +36,8 @@ def check_address(address):
return
if info['ismine'] is True:
reply['ismine'] = True
reply['server'] = server
reply['ismine'] = True
#first check the main server
check_address_inner(MAIN_SERVER)
@@ -62,4 +64,4 @@ def get_server(key):
if 'namecoin_address' in info:
return check_address(info['namecoin_address'])
else:
return json.dumps({})
return reply

View File

@@ -6,66 +6,96 @@
#-----------------------
#520 is the real limit
#hardcoded here instead of some config file
VALUE_MAX_LIMIT = 512
import requests
import json
from coinrpc.namecoin.namecoind_wrapper import namecoind_blocks, namecoind_name_new, check_registration
from coinrpc.namecoin.namecoind_wrapper import namecoind_name_update, namecoind_name_show
from common import utf8len, log
from common import users, register_queue
from config import LOAD_BALANCER
from coinrpc.namecoin.namecoind_server import NamecoindServer
from blockdata.namecoind_cluster import get_server
from config import NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, WALLET_PASSPHRASE, NAMECOIND_USE_HTTPS
from config import MAIN_SERVER, LOAD_SERVERS
from config import DEFAULT_HOST, MEMCACHED_PORT, MEMCACHED_TIMEOUT
namecoind = NamecoindServer(MAIN_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE)
import pylibmc
from time import time
mc = pylibmc.Client([DEFAULT_HOST + ':' + MEMCACHED_PORT],binary=True)
#-----------------------------------
from pymongo import MongoClient
client = MongoClient()
local_db = client['namecoin']
queue = local_db.queue
def register_name(key,value,server=MAIN_SERVER):
from config import MONGODB_URI
remote_client = MongoClient(MONGODB_URI)
remote_db = remote_client.get_default_database()
users = remote_db.user
registrations = remote_db.user_registration
#-----------------------------------
def utf8len(s):
if type(s) == unicode:
return len(s)
else:
return len(s.encode('utf-8'))
#-----------------------------------
def save_name_new_info(info,key,value):
reply = {}
try:
#check if already in register queue (name_new)
check_queue = register_queue.find_one({"key":key})
if check_queue is not None:
reply['message'] = "ERROR: " + "already in register queue: " + str(key)
else:
namecoind = NamecoindServer(server, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE)
info = namecoind.name_new(key,json.dumps(value))
try:
reply['longhex'] = info[0]
reply['rand'] = info[1]
reply['key'] = key
reply['value'] = value
reply['backend_server'] = int(LOAD_BALANCER)
reply['longhex'] = info[0]
reply['rand'] = info[1]
reply['key'] = key
reply['value'] = json.dumps(value)
#get current block...
blocks = namecoind_blocks()
#get current block...
blocks = namecoind.blocks()
reply['current_block'] = blocks['blocks']
reply['wait_till_block'] = blocks['blocks'] + 12
reply['activated'] = False
reply['current_block'] = blocks['blocks']
reply['wait_till_block'] = blocks['blocks'] + 12
reply['activated'] = False
reply['server'] = server
#save this data to Mongodb...
queue.insert(reply)
#save this data to Mongodb...
register_queue.insert(reply)
reply['message'] = 'Your registration will be completed in roughly two hours'
del reply['_id'] #reply[_id] is causing a json encode error
#reply[_id] is causing a json encode error
del reply['_id']
except Exception as e:
reply['message'] = "ERROR: " + str(e)
except Exception as e:
reply['message'] = "ERROR: " + str(e)
log.debug(reply)
log.debug('-' * 5)
return reply
#-----------------------------------
def update_name(key,value):
reply = {}
cache_reply = mc.get("name_update_" + str(key))
if cache_reply is None:
server = get_server(key)
namecoind = NamecoindServer(server, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD, NAMECOIND_USE_HTTPS, WALLET_PASSPHRASE)
info = namecoind.name_update(key,json.dumps(value))
log.debug(value)
reply['tx'] = info
mc.set("name_update_" + str(key),"in_memory",int(time() + MEMCACHED_TIMEOUT))
else:
reply['message'] = "ERROR: " + "recently sent name_update: " + str(key)
log.debug(reply)
log.debug('-' * 5)
#-----------------------------------
def slice_profile(username, profile, old_keys=None):
@@ -120,47 +150,13 @@ def slice_profile(username, profile, old_keys=None):
return keys, values
#-----------------------------------
def register_name(key,value):
info = namecoind_name_new(key,json.dumps(value))
reply = save_name_new_info(info,key,json.dumps(value))
print reply
print '---'
#-----------------------------------
def update_name(key,value):
reply = {}
info = namecoind_name_update(key,json.dumps(value))
reply['key'] = key
reply['value'] = value
reply['activated'] = True
reply['backend_server'] = int(LOAD_BALANCER)
#save this data to Mongodb...
check = queue.find_one({'key':key})
if check is None:
queue.insert(reply)
else:
queue.save(reply)
print reply
print info
print '---'
#----------------------------------
def get_old_keys(username):
#----------------------------------
def get_next_key(key):
check_profile = namecoind_name_show(key)
check_profile = namecoind.name_show(key)
try:
check_profile = check_profile['value']
@@ -187,7 +183,7 @@ def get_old_keys(username):
return old_keys
#-----------------------------------
def process_user(username,profile):
def process_user(username,profile,server=MAIN_SERVER):
#old_keys = get_old_keys(username)
@@ -197,23 +193,23 @@ def process_user(username,profile):
key1 = keys[index]
value1 = values[index]
print utf8len(json.dumps(value1))
if check_registration(key1):
if namecoind.check_registration(key1):
#if name is registered
print "name update: " + key1
log.debug("name update: %s", key1)
log.debug("size: %s", utf8len(json.dumps(value1)))
update_name(key1,value1)
else:
#if not registered
print "name new: " + key1
register_name(key1,value1)
log.debug("name new: %s", key1)
log.debug("size: %s", utf8len(json.dumps(value1)))
register_name(key1,value1,server)
process_additional_keys(keys, values)
process_additional_keys(keys, values,server)
#-----------------------------------
def process_additional_keys(keys,values):
def process_additional_keys(keys,values,server):
#register/update remaining keys
size = len(keys)
@@ -222,13 +218,13 @@ def process_additional_keys(keys,values):
next_key = keys[index]
next_value = values[index]
if check_registration(next_key):
print "name update: " + next_key
print utf8len(json.dumps(next_value))
log.debug(utf8len(json.dumps(next_value)))
if namecoind.check_registration(next_key):
log.debug("name update: " + next_key)
update_name(next_key,next_value)
else:
print "name new: " + next_key
print utf8len(json.dumps(next_value))
register_name(next_key,next_value)
log.debug("name new: " + next_key)
register_name(next_key,next_value,server)
index += 1

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python
#-----------------------
# Copyright 2013 Halfmoon Labs, Inc.
# All Rights Reserved
#-----------------------
'''
This file contains common code
'''
from config import DEBUG
import logging
import json
#-----------------------------------
from pymongo import MongoClient
client = MongoClient()
local_db = client['namecoin']
register_queue = local_db.queue
from config import MONGODB_URI
remote_client = MongoClient(MONGODB_URI)
remote_db = remote_client.get_default_database()
users = remote_db.user
#-------------------------
def get_logger():
if(DEBUG):
log = logging.getLogger()
log.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(levelname)s] %(message)s')
handler_stream = logging.StreamHandler()
handler_stream.setFormatter(formatter)
log.addHandler(handler_stream)
else:
log = None
return log
#-------------------------
#common logger
log = get_logger()
#-------------------------
def pretty_dump(input):
return json.dumps(input, cls=MongoEncoder, sort_keys=False, indent=4, separators=(',', ': '))
#-------------------------
def pretty_print(input):
log.debug(pretty_dump(input))
#-----------------------------------
def utf8len(s):
if type(s) == unicode:
return len(s)
else:
return len(s.encode('utf-8'))
#-----------------------------------------
def get_json(data):
if isinstance(data,dict):
pass
else:
data = json.loads(data)
return data
#-----------------------------------------
def get_string(data):
if isinstance(data,dict):
data = json.dumps(data)
else:
pass
return data

View File

@@ -31,4 +31,8 @@ except:
MONGODB_URI = os.environ['MONGODB_URI']
OLD_DB = os.environ['OLD_DB']
LOAD_BALANCER = os.environ['LOAD_BALANCER']
LOAD_BALANCER = os.environ['LOAD_BALANCER']
DEFAULT_HOST = '127.0.0.1'
MEMCACHED_PORT = '11211'
MEMCACHED_TIMEOUT = 15 * 60

View File

@@ -37,8 +37,6 @@ old_users = old_db.user
local_client = MongoClient()
local_db = local_client['namecoin']
queue_register = local_db.queue
queue_update = local_db.queue_update
problem_users = ['madmoneymachine', 'drmox', 'emiljohansson','xfaure','megaz28','maxweiss','kh','patrickcines']
@@ -48,20 +46,6 @@ def process_profile(username,profile):
if username in problem_users:
return
#check if already in register queue (name_new)
check_queue = queue_register.find_one({"key":'u/' + username})
if check_queue is not None:
print "Already in register queue: " + str(username)
return
#check if already in update queue (name_update)
check_queue = queue_update.find_one({"key":'u/' + username})
if check_queue is not None:
print "Already in update queue: " + str(username)
return
#check if load-balancer is correct
old_user = old_users.find_one({"username":username})